Nestjs Queue
บทความนี้จะพาไปเรียนรู้วิธีการใช้งาน Queue ใน NestJS โดยใช้ Bull และ Redis เพื่อจัดการงานแบบ asynchronous อย่างมีประสิทธิภาพ ตั้งแต่การติดตั้ง Redis, การตั้งค่าโปรเจค NestJS, การติดตั้งแพ็กเกจที่จำเป็น ไปจนถึงการสร้างและใช้งาน queue พร้อมตัวอย่างโค้ดจริง เหมาะสำหรับผู้ที่ต้องการเพิ่มความสามารถในการประมวลผลงานแบบ background หรือ job processing ให้กับแอปพลิเคชันของตนเอง
ติดตั้ง redis
docker run --name redis -d -p 6379:6379 redis
ติดตั้ง nestjs-queue
nest new nestjs-queue
ติดตั้ง package
npm i @nestjs/bull bull ioredis
npm i -D @types/bull
npm i @nestjs/config
# ติดตั้ง ui เพื่อ monitor queue
npm i @bull-board/api @bull-board/express
สร้างโครงสร้างโปรเจค
src/
app.module.ts
main.ts
queues/
email/
email.module.ts
email.processor.ts
email.producer.ts
email.controller.ts
ตั้งค่า Bull + คิว email
src/app.module.ts
โค้ดตัวอย่างนี้เป็นการตั้งค่า Bull queue ใน NestJS เพื่อใช้งานกับ Redis และแยกโมดูลสำหรับ email queue อย่างเป็นระบบ โดยมีรายละเอียดดังนี้
นำเข้าโมดูลที่จำเป็น
@nestjs/common
สำหรับประกาศโมดูลหลัก@nestjs/config
สำหรับจัดการ environment variables และ config@nestjs/bull
สำหรับเชื่อมต่อและจัดการ Bull queue./queues/email/email.module
โมดูลย่อยสำหรับ email queue
AppModule
- ใช้
ConfigModule.forRoot({ isGlobal: true })
เพื่อให้ config ใช้งานได้ทั่วแอป - ใช้
BullModule.forRootAsync
เพื่อกำหนดค่าเชื่อมต่อ Redis แบบ dynamic จาก environment variables- กำหนด host, port, password ของ Redis โดยดึงค่าจาก
ConfigService
- สามารถกำหนด
prefix
เพื่อป้องกันคิวชนกันข้ามโปรเจกต์ (คอมเมนต์ไว้) - ตั้งค่า
defaultJobOptions
removeOnComplete: 1000
ลบ job ที่สำเร็จ เก็บประวัติไว้ 1000 รายการremoveOnFail: false
ไม่ลบ job ที่ล้มเหลว
- กำหนด host, port, password ของ Redis โดยดึงค่าจาก
- นำเข้า
EmailModule
เพื่อแยก logic ของ email queue ออกเป็นสัดส่วน
- ใช้
สรุปข้อดี
- แยก concerns ชัดเจน ดูแลและขยายระบบได้ง่าย
- ปรับ config ได้จาก environment variables รองรับหลาย environment
- ตั้งค่า Bull queue อย่างปลอดภัยและมีประสิทธิภาพ
ตัวอย่างโครงสร้างการตั้งค่า Bull + Email Queue ใน NestJS
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { BullModule } from "@nestjs/bull";
import { EmailModule } from "./queues/email/email.module";
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
BullModule.forRootAsync({
useFactory: (config: ConfigService) => ({
redis: {
host: config.get("REDIS_HOST", "localhost"),
port: parseInt(config.get("REDIS_PORT", "6379"), 10),
password: config.get("REDIS_PASSWORD") || undefined,
},
// prefix: 'myapp', // ป้องกันคิวชนกันข้ามโปรเจ็กต์
defaultJobOptions: {
removeOnComplete: 1000, // ลบ job ที่สำเร็จ เก็บประวัติไว้ 1000 รายการ
removeOnFail: false,
},
}),
inject: [ConfigService],
}),
EmailModule,
],
})
export class AppModule {}
src/queues/email/email.module.ts
import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bull";
import { EmailProcessor } from "./email.processor";
import { EmailProducer } from "./email.producer";
import { EmailController } from "./email.controller";
@Module({
imports: [
BullModule.registerQueue({
name: "email",
}),
],
providers: [EmailProcessor, EmailProducer],
controllers: [EmailController],
exports: [EmailProducer],
})
export class EmailModule {}
src/queues/email/email.producer.ts
// อธิบายโค้ด email.producer.ts
(EmailProducer) แบบ bullet point
หน้าที่หลัก:
- เป็น Service สำหรับ enqueue งาน (job) ลงใน Bull queue ชื่อ
email
- แยก logic การสร้าง job แต่ละประเภท เช่น welcome email, newsletter, cron job
- เป็น Service สำหรับ enqueue งาน (job) ลงใน Bull queue ชื่อ
โครงสร้างและการใช้งาน:
- ใช้
@Injectable()
เพื่อให้ NestJS จัดการ dependency injection - ใช้
@InjectQueue("email")
เพื่อดึง queue ที่ชื่อว่าemail
มาใช้งาน - มีการสร้าง logger (
Logger
) สำหรับ debug/logging
- ใช้
เมธอดหลัก:
enqueueWelcomeEmail(userId: string)
- สร้าง job ชื่อ
welcome
พร้อมข้อมูลuserId
- กำหนด retry 5 ครั้ง, backoff แบบ exponential, priority สูง, timeout 60 วินาที
- ใช้สำหรับส่งอีเมลต้อนรับผู้ใช้ใหม่
- สร้าง job ชื่อ
enqueueBulkNewsletter(batchId: string)
- สร้าง job ชื่อ
newsletter
พร้อมข้อมูลbatchId
- retry 3 ครั้ง, backoff แบบ fixed, กำหนด jobId เพื่อป้องกัน enqueue ซ้ำ
- ใช้สำหรับส่ง newsletter แบบ batch
- สร้าง job ชื่อ
ensureReportCron()
- สร้าง job แบบ repeat (cron) ทุก 5 นาที
- jobId คงที่เพื่อไม่สร้างซ้ำ
- ใช้สำหรับงาน background ที่ต้องรันซ้ำ ๆ เช่น daily report
ข้อดีของการแยก Producer:
- ทำให้ controller/service อื่น ๆ เรียกใช้งาน queue ได้ง่ายและปลอดภัย
- สามารถกำหนด options ของแต่ละ job ได้ละเอียด (retry, priority, backoff ฯลฯ)
- รองรับการขยายประเภท job ในอนาคต
import { Injectable, Logger } from "@nestjs/common";
import { InjectQueue } from "@nestjs/bull";
import { Queue } from "bull";
@Injectable()
export class EmailProducer {
private readonly logger = new Logger(EmailProducer.name);
constructor(@InjectQueue("email") private readonly emailQueue: Queue) {}
async enqueueWelcomeEmail(userId: string) {
this.logger.log(`Enqueue welcome email for user ${userId}`);
return this.emailQueue.add(
"welcome",
{ userId },
{
attempts: 5,
backoff: { type: "exponential", delay: 5000 },
priority: 2, // ยิ่งตัวเลขน้อยยิ่งสำคัญ (Bull v3: 1 สูงสุด)
timeout: 60_000,
}
);
}
async enqueueBulkNewsletter(batchId: string) {
return this.emailQueue.add(
"newsletter",
{ batchId },
{
attempts: 3,
backoff: { type: "fixed", delay: 3000 },
jobId: `newsletter:${batchId}`, // กัน enqueue ซ้ำ
}
);
}
// ตัวอย่าง job แบบซ้ำ ๆ (cron ทุก 5 นาที)
async ensureReportCron() {
return this.emailQueue.add(
"daily-report",
{},
{
repeat: { cron: "*/5 * * * *" },
jobId: "daily-report-cron",
}
);
}
}
src/queues/email/email.processor.ts
// อธิบายโค้ด email.processor.ts
(EmailProcessor) พร้อม bullet point
หน้าที่หลัก:
- เป็น Processor สำหรับประมวลผลงาน (job) ที่ถูก enqueue ใน Bull queue ชื่อ
email
- แยก handler สำหรับแต่ละประเภท job เช่น welcome email, newsletter, และ cron job
- เป็น Processor สำหรับประมวลผลงาน (job) ที่ถูก enqueue ใน Bull queue ชื่อ
โครงสร้างและการใช้งาน:
- ใช้
@Processor("email")
เพื่อระบุว่า class นี้จะประมวลผล queue ชื่อemail
- ใช้
@Process
เพื่อกำหนด handler ของแต่ละ job type- สามารถกำหนด
concurrency
เพื่อควบคูณจำนวน job ที่ประมวลผลพร้อมกัน
- สามารถกำหนด
- ใช้
Logger
สำหรับ logging/debugging
- ใช้
เมธอดหลัก:
handleWelcome
- ประมวลผล job ชื่อ
welcome
- รับข้อมูล
{ userId }
- mock การส่งอีเมล (สามารถเปลี่ยนเป็นบริการจริงได้)
- รองรับการประมวลผลพร้อมกันสูงสุด 5 job
- ประมวลผล job ชื่อ
handleNewsletter
- ประมวลผล job ชื่อ
newsletter
- รับข้อมูล
{ batchId }
- mock การส่ง newsletter
- ประมวลผล job ชื่อ
handleDailyReport
- ประมวลผล job ชื่อ
daily-report
- ใช้สำหรับงาน background ที่ต้องรันซ้ำ ๆ (เช่น รายงานประจำวัน)
- ประมวลผล job ชื่อ
Event Hooks:
@OnQueueActive
— เรียกเมื่อ job เริ่มประมวลผล@OnQueueCompleted
— เรียกเมื่อ job สำเร็จ@OnQueueFailed
— เรียกเมื่อ job ล้มเหลว- ช่วยในการ debug และ monitor สถานะของ job
ข้อดีของการแยก Processor:
- ทำให้ logic การประมวลผลแต่ละประเภท job แยกกันชัดเจน
- รองรับการขยายประเภท job ในอนาคตได้ง่าย
- สามารถเพิ่ม event hooks เพื่อ monitor หรือแจ้งเตือนเมื่อเกิด error ได้สะดวก
import {
Processor,
Process,
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
} from "@nestjs/bull";
import { Job } from "bull";
import { Logger } from "@nestjs/common";
@Processor("email")
export class EmailProcessor {
private readonly logger = new Logger(EmailProcessor.name);
// ประมวลผล welcome email (ทำพร้อมกันสูงสุด 5 job)
@Process({ name: "welcome", concurrency: 5 })
async handleWelcome(job: Job<{ userId: string }>) {
this.logger.log(
`[welcome] user=${job.data.userId} attempt=${job.attemptsMade + 1}`
);
// mock ส่งอีเมล (แทนที่ด้วยบริการจริง SendGrid/SES/etc.)
await new Promise((r) => setTimeout(r, 1500));
// ถ้าอยาก fail ลองโยน error:
// if (Math.random() < 0.1) throw new Error('random fail');
return { ok: true };
}
@Process("newsletter")
async handleNewsletter(job: Job<{ batchId: string }>) {
this.logger.log(`[newsletter] batch=${job.data.batchId}`);
await new Promise((r) => setTimeout(r, 2000));
return { sent: 1234 };
}
@Process("daily-report")
async handleDailyReport(job: Job) {
this.logger.log(`[daily-report] generating ...`);
await new Promise((r) => setTimeout(r, 1000));
return { reportId: Date.now() };
}
// Events ช่วยดีบัก/มอนิเตอร์
@OnQueueActive()
onActive(job: Job) {
this.logger.debug(`Job ${job.id} is active [${job.name}]`);
}
@OnQueueCompleted()
onCompleted(job: Job, result: any) {
this.logger.debug(`Job ${job.id} completed → ${JSON.stringify(result)}`);
}
@OnQueueFailed()
onFailed(job: Job, err: Error) {
this.logger.error(`Job ${job.id} failed: ${err.message}`, err.stack);
}
}
src/queues/email/email.controller.ts
// อธิบายโค้ด email.controller.ts
(EmailController) พร้อม bullet point
หน้าที่หลัก:
- เป็น Controller สำหรับจัดการการเรียกใช้งาน queue จากภายนอก
- มี endpoint สำหรับส่งงานไปยัง queue ต่าง ๆ
โครงสร้างและการใช้งาน:
- ใช้
@Controller("email")
เพื่อระบุว่า class นี้จะเป็น controller สำหรับ email queue - ใช้
@Post
เพื่อกำหนด endpoint สำหรับส่งงานไปยัง queue - มี constructor สำหรับ inject
EmailProducer
เพื่อใช้งาน
- ใช้
เมธอดหลัก:
welcome
- ส่งงาน welcome email ไปยัง queue
- รับ
userId
จาก parameter ของ endpoint - เรียกใช้
enqueueWelcomeEmail
จากEmailProducer
newsletter
- ส่งงาน newsletter ไปยัง queue
- รับ
batchId
จาก parameter ของ endpoint - เรียกใช้
enqueueBulkNewsletter
จากEmailProducer
ensureReport
- ส่งงาน cron job ไปยัง queue
- เรียกใช้
ensureReportCron
จากEmailProducer
ข้อดีของการแยก Controller:
- ทำให้ logic การเรียกใช้งาน queue แยกกันชัดเจน
- รองรับการขยายประเภท job ในอนาคตได้ง่าย
- สามารถเพิ่มการตรวจสอบสิทธิ์ หรือการจัดการ error ได้ง่าย
import { Controller, Param, Post } from "@nestjs/common";
import { EmailProducer } from "./email.producer";
@Controller("email")
export class EmailController {
constructor(private readonly producer: EmailProducer) {}
// POST /email/welcome/123
@Post("welcome/:userId")
async welcome(@Param("userId") userId: string) {
const job = await this.producer.enqueueWelcomeEmail(userId);
return { enqueued: true, jobId: job.id };
}
// POST /email/newsletter/batch-001
@Post("newsletter/:batchId")
async newsletter(@Param("batchId") batchId: string) {
const job = await this.producer.enqueueBulkNewsletter(batchId);
return { enqueued: true, jobId: job.id };
}
// POST /email/report/ensure
@Post("report/ensure")
async ensureReport() {
await this.producer.ensureReportCron();
return { ok: true };
}
}
src/main.ts
// อธิบายโค้ดใน src/main.ts
และสรุปเป็น bullet ที่อ่านง่าย
อธิบายโค้ด src/main.ts
:
นำเข้าโมดูลที่จำเป็น
NestFactory
สำหรับสร้าง NestJS application instanceAppModule
คือ root module ของแอปcreateBullBoard
,ExpressAdapter
,BullAdapter
สำหรับตั้งค่า bull-board UIgetQueueToken
สำหรับดึง queue instance จาก NestJS DI
ฟังก์ชัน
bootstrap
- สร้างแอป NestJS ด้วย
AppModule
- สร้าง bull-board dashboard ด้วย
ExpressAdapter
และตั้ง path เป็น/queues
- ดึง queue ที่ชื่อ
"email"
จาก DI container - สร้าง bull-board dashboard โดยผูก queue ที่ต้องการ monitor
- นำ bull-board router ไปผูกกับ Express instance ที่ path
/queues
- สั่งให้แอปรันที่ port 3000
- สร้างแอป NestJS ด้วย
สรุปขั้นตอนการทำงาน:
- 🚀 สร้าง NestJS app ด้วย
AppModule
- 📨 ดึง queue ที่ต้องการ monitor (เช่น
"email"
) - 📊 ตั้งค่า bull-board dashboard สำหรับดูสถานะ queue
- 🔗 ผูก bull-board dashboard เข้ากับ Express ที่ path
/queues
- 🟢 รันแอปที่
http://localhost:3000
- 👀 สามารถเปิดหน้า monitor queue ได้ที่
http://localhost:3000/queues
ข้อดีของการตั้งค่า bull-board:
- ดูสถานะ job ใน queue ได้แบบ real-time
- สามารถ retry, remove, หรือดูรายละเอียด job ได้จาก UI
- เหมาะสำหรับ dev และ ops ในการ debug หรือ monitor ระบบ queue
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
import { createBullBoard } from "@bull-board/api";
import { ExpressAdapter } from "@bull-board/express";
import { BullAdapter } from "@bull-board/api/bullAdapter";
import { getQueueToken } from "@nestjs/bull";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// bull-board
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/queues");
const emailQueue = app.get(getQueueToken("email"));
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
const express = app.getHttpAdapter().getInstance();
express.use("/queues", serverAdapter.getRouter());
await app.listen(3000);
}
bootstrap();
เปิดดูหน้า monitor queue
http://localhost:3000/queues
ทดสอบการทำงาน
curl -X POST http://localhost:3000/email/welcome/123
curl -X POST http://localhost:3000/email/newsletter/batch-001
curl -X POST http://localhost:3000/email/report/ensure
สรุป
สรุปเนื้อหาสำคัญจากบทความนี้
- อธิบายการใช้งาน Bull Queue ร่วมกับ NestJS เพื่อจัดการงานแบบ asynchronous และ background jobs ได้อย่างมีประสิทธิภาพ
- แนะนำการติดตั้ง bull-board สำหรับ monitoring และจัดการ queue ผ่านหน้าเว็บ UI
- สามารถนำไปประยุกต์ใช้กับงานที่ต้องการความน่าเชื่อถือและการประมวลผลเบื้องหลัง เช่น การส่งอีเมล หรือประมวลผลข้อมูลขนาดใหญ่
- ควรให้ความสำคัญกับการจัดการ error และการ scaling queue worker เพื่อรองรับการใช้งานใน production อย่างเหมาะสม
ตัวอย่างโค้ดและโปรเจกต์ตัวอย่าง
สามารถดูตัวอย่างโค้ดและโปรเจกต์เต็มได้ที่ GitHub: https://github.com/earth774/nestjs-queue-email