Queuing jobs in NestJS using @nestjs/bullmq package

Reading Time: 7 minutes

Loading

Introduction

Both Bull and BullMQ are queue libraries that persist jobs in Redis. Bull is in maintenance mode and maintainers only fix bugs in the library. The new version of the library, BullMQ, is rewritten in TypeScript. Bull and BullMQ are similar except BullMQ introduces flow producer that can split a resource-intensive job into children jobs, and parent job processes the result of the children jobs when they are all completed. Queuing jobs in NestJS has the option to upgrade to BullMQ and nestjs/bullmq in version 10.

NestJS has a @nestjs/bullmq package (https://github.com/nestjs/bull/tree/master/packages/bullmq) but the official documentation of NestJS does not have example of @nestjs/bullmq and bullmq.

Install dependencies

$ npm i --save @nestjs/bullmq bullmq

Register Bull Module

First, I used BullModule.forRoot to register BullMQ and connect the package to a Redis instance. Second, I imported the BullModule to the imports array of AppModule. The Redis host and port are the default values that are localhost and 6379 respectively.

connection: {
    host: 'localhost',
    port: 6379,
 },
// queue.config.ts

import { BullModule } from '@nestjs/bullmq';

export const queueConfig = BullModule.forRoot({
  connection: {
    host: 'localhost',
    port: 6379,
  },
  defaultJobOptions: {
    removeOnComplete: 1000,
    removeOnFail: 5000,
    attempts: 3,
  },
});
// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';

@Module({
  imports: [queueConfig],
})
export class AppModule {}

Register BullMQ queues and Flow Producer with module

In this demo, I registered 4 BullMQ queues and 1 flow producer in a custom queue module. BullModule.registerQueue({ name }) registers a regular queue named name whereas BullModule.registerFlowProducer registers a flow producer named flow.

// queue-board.interface.ts

export interface QueueBoardModuleOptions {
  queues: string[];
  flows?: string[];
}
// queue-board.module-definition.ts

import { ConfigurableModuleBuilder } from '@nestjs/common';
import { QueueBoardModuleOptions } from './queue-board.interface.';

export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN, OPTIONS_TYPE } =
  new ConfigurableModuleBuilder<QueueBoardModuleOptions>().build();
// queue-board.module.ts

@Module({})
export class QueueModule extends ConfigurableModuleClass {
  static register(options: typeof OPTIONS_TYPE): DynamicModule {
    const bullModules = options.queues.map((name) => BullModule.registerQueue({ name }));

    const flowProducers = (options.flows || []).map((flow) =>
      BullModule.registerFlowProducer({
        name: flow,
      }),
    );

    return {
      ...super.register(options),
      imports: [...bullModules, ...flowProducers],
      exports: [...bullModules, ...flowProducers],
    };
  }
}

Register QueueModule in MathModule

QueueModule registers 4 queues, MATH_BINARY, MATH_UNARY, MATH_ARRAY_CHILD and MATH_ARRAY_MERGE, and 1 flow producer MATH_ARRAY_PRODUCER in BullModule in @nestjs/bullmq package.

// math.module.ts

@Module({
  imports: [
    QueueModule.register({
      queues: [MATH_BINARY, MATH_UNARY, MATH_ARRAY_CHILD, MATH_ARRAY_MERGE],
      flows: [MATH_ARRAY_PRODUCER],
    }),
  ],
  providers: [
    MathBinaryOperationProcessor,
    MathUnaryOperationPocessor,
    MathArrayChildProcessor,
    MathArrayMergeProcessor,
    ArrayFlowService,
  ],
  controllers: [MathController, MathArrayController],
})
export class MathModule {}

Import MathModule into AppModule

// app.module.ts
import { Module } from '@nestjs/common';
import { queueConfig } from './config/queue.config';
import { MathModule } from './math/math.module';

@Module({
  imports: [queueConfig, MathModule],
})
export class AppModule {}

Queuing jobs in NestJS and BullMQ

The codes of queuing jobs in NestJS is the same regardless nestjs/bull or nestjs/bullmq is used. Use InjectQueue decorator to inject a queue and add a job with data. The new job has a job id that the controller method can return to client. Client can use that job id to check status and obtain job value.

// inject-queue.decorator.ts
import { InjectQueue } from '@nestjs/bullmq';
import { MATH_BINARY, MATH_UNARY } from '../constants/math.constant';

export const InjectMathBinaryQueue = () => InjectQueue(MATH_BINARY);
export const InjectMathUnaryQueue = () => InjectQueue(MATH_UNARY);
// math.controller.ts

@Controller('math')
export class MathController {
  constructor(@InjectMathBinaryQueue() private mathBinaryQueue: Queue) {}

  @Post('sum')
  async sum(@Body() dto: BinaryOperationDto): Promise<string> {
    const job = await this.mathBinaryQueue.add(MATH_BINARY_OPS.SUM, dto);
    return job.id || '';
  }
}

Create BullMQ processor

Bullmq does not have @Process() decorator; therefore, it handles named job differently in processor class. In Bullmq processor, the process method uses a switch construct to compare job name and invoke the corresponding job function.

// worker-host.process.ts

import { OnWorkerEvent, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';

export abstract class WorkerHostProcessor extends WorkerHost {
  protected readonly logger = new Logger(WorkerHostProcessor.name);

  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    const { id, name, queueName, finishedOn, returnvalue } = job;
    const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
    this.logger.log(
      `Job id: ${id}, name: ${name} completed in queue ${queueName} on ${completionTime}. Result: ${returnvalue}`,
    );
  }

  @OnWorkerEvent('progress')
  onProgress(job: Job) {
    const { id, name, progress } = job;
    this.logger.log(`Job id: ${id}, name: ${name} completes ${progress}%`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job) {
    const { id, name, queueName, failedReason } = job;
    this.logger.error(`Job id: ${id}, name: ${name} failed in queue ${queueName}. Failed reason: ${failedReason}`);
  }

  @OnWorkerEvent('active')
  onActive(job: Job) {
    const { id, name, queueName, timestamp } = job;
    const startTime = timestamp ? new Date(timestamp).toISOString() : '';
    this.logger.log(`Job id: ${id}, name: ${name} starts in queue ${queueName} on ${startTime}.`);
  }
}

WorkerHostProcessor extends WorkerHost and overrides @OnWorkEvent('completed'), @OnWorkerEvent('progress') and @OnWorkerEvent('active') to log the completion, progress and activation of a job. Other processors can extend WorkerHostProcessor to avoid writing boilerplate @OnWorkerEvent.

// binary-operation.dto

import { IsNumber } from 'class-validator';

export class BinaryOperationDto {
  @IsNumber()
  num: number;

  @IsNumber()
  num2: number;
}
// math-binary-ops.enum.ts

export enum MATH_BINARY_OPS {
  SUM = 'SUM',
  SUBTRACT = 'SUBTRACT',
  MULTIPLY = 'MULTIPLY',
  DIVISION = 'DIVISION',
}
// math-binary-operation.processor.ts

import { Processor } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { MATH_BINARY } from '../constants/math.constant';
import { WorkerHostProcessor } from './worker-host.processor';

@Processor(MATH_BINARY)
@Injectable()
export class MathBinaryOperationProcessor extends WorkerHostProcessor {
  process(job: Job<BinaryOperationDto, number, string>): Promise<number> {
    const { num, num2 } = job.data;
    switch (job.name) {
      case MATH_BINARY_OPS.SUM:
        return Promise.resolve(num + num2);
    }
    throw new BadRequestException(`Unknown job name: ${job.name}`);
  }
}

@Processor(MATH_BINARY) ensures that the processor takes jobs from MATH_BINARY queue to complete. In process method, the switch block matches job name to MATH_BINARY_OPS.SUM and returns the sum of 2 number inputs.

Then, import MathBinaryOperationProcessor in the providers array of MathModule

// math.module.ts

@Module({
  ...
  providers: [
    MathBinaryOperationProcessor,
  ],
  controllers: [MathController],
})
export class MathModule {}

Test BullMq

I made a HTTP request to sum 2 numbers together and the response is the job id

curl --location 'http://localhost:3000/math/sum' \
--header 'Content-Type: application/json' \
--data '{
    "num": 4,
    "num2": 3
}'
5

In the terminal, the job completed and logged the answer 7.

[Nest] 33415  - 12/17/2023, 10:19:15 PM     LOG [WorkerHostProcessor] Job id: 5, name: SUM starts in queue math-binary on 2023-12-17T14:19:15.168Z.
[Nest] 33415  - 12/17/2023, 10:19:15 PM     LOG [WorkerHostProcessor] Job id: 5, name: SUM completed in queue math-binary on 2023-12-17T14:19:15.736Z. Result: 7

Flow Producer

This is a new feature of BullMQ. The idea of flow producer is to split a heavy job into one or more smaller jobs to process. When all the smaller jobs finish, the results are available to the parent job to combine them into the final result

Register a flow producer

BullModule.registerFlowProducer({ name: flow, })

Inject flow producer

In createFlow method, I split a number array into sub-arrays. Then, I created a child job for each sub-array to run. When these children jobs finish, the parent job in the parent queue, MATH_ARRAY_MERGE, executes to produce the final result.

// array-flow.service.ts

const PARTITION_SIZE = 4;

@Injectable()
export class ArrayFlowService {
  constructor(@InjectMathArrayProducer() private mathFlowProducer: FlowProducer) {}

  async createFlow(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS): Promise<string> {
    const flow = await this.mathFlowProducer.add({
      name: jobName,
      queueName: MATH_ARRAY_MERGE,
      children: this.createChildrenJobs(dto, jobName),
    });
    return flow.job.id || '';
  }

  private createChildrenJobs(dto: ArrayOperationDto, jobName: MATH_ARRAY_OPS) {
    const numPartitions = Math.ceil(dto.data.length / PARTITION_SIZE);
    let startIdx = 0;

    const children: FlowChildJob[] = [];
    for (let i = 0; i < numPartitions - 1; i++) {
      children.push({
        name: jobName,
        data: {
          data: dto.data.slice(startIdx, startIdx + PARTITION_SIZE),
          percentage: (100 / numPartitions) * (i + 1),
        },
        queueName: MATH_ARRAY_CHILD,
      });
      startIdx = startIdx + PARTITION_SIZE;
    }

    children.push({
      name: jobName,
      data: { data: dto.data.slice(startIdx), percentage: 100 },
      queueName: MATH_ARRAY_CHILD,
    });

    return children;
  }
}

Child processor

The child processor takes jobs from the MATH_ARRAY_CHILD queue to process. When job name is MATH_ARRAY_OPS.MIN, it returns the minimum element of an arbitrary array. When job name is MATH_ARRAY_OPS.MAX, it returns the maximum element of an array. MATH_ARRAY_OPS.FILTER_ODD job filters odd numbers and MATH_ARRAY_OPS.FILTER_EVEN job filters even numbers.

// math-array-child.processor.ts

@Processor(MATH_ARRAY_CHILD)
@Injectable()
export class MathArrayChildProcessor extends WorkerHostProcessor {
  async process(job: Job<ComparisonJobProgress, number | number[], string>): Promise<number | number[]> {
    switch (job.name) {
      case MATH_ARRAY_OPS.MIN:
        return Math.min(...job.data.data);
      case MATH_ARRAY_OPS.MAX:
        const maxResult = Math.max(...job.data.data);
      case MATH_ARRAY_OPS.FILTER_ODD:
        return job.data.data.filter((n) => n % 2 === 1);
      case MATH_ARRAY_OPS.FILTER_EVEN:
        return job.data.data.filter((n) => n % 2 === 0);
    }

    throw new BadRequestException(`Unknown job name ${job.name} found in queue ${job.queueName}`);
  }
}

Producer processor

When all children jobs complete successfully, parent job in the MATH_ARRAY_MERGE queue receives the children values through Object.values(await job.getChildrenValues()). Then, the parent job invokes more functions on the children values to yield the final result.

// math-array-merge.process.ts

@Processor(MATH_ARRAY_MERGE)
@Injectable()
export class MathArrayMergeProcessor extends WorkerHostProcessor {
  async process(job: Job<ArrayOperationDto, number | number[], string>): Promise<number | number[]> {
    const results = Object.values(await job.getChildrenValues());
    switch (job.name) {
      case MATH_ARRAY_OPS.MIN:
        return Math.min(...results);
      case MATH_ARRAY_OPS.MAX:
        return Math.max(...results);
      case MATH_ARRAY_OPS.FILTER_ODD:
      case MATH_ARRAY_OPS.FILTER_EVEN:
        return (results as number[][]).flat();
    }

    throw new BadRequestException(`Unknown job name ${job.name}`);
  }
}

Controller

When user makes a request to find the minimum/maximum number in an array, a new flow is appended to the flow producer

// math-array.controller.ts

@Controller('math-array')
export class MathArrayController {
  constructor(private arrayFlowService: ArrayFlowService) {}

  @Post('min')
  async findMin(@Body() dto: ArrayOperationDto): Promise<string> {
    return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MIN);
  }

  @Post('max')
  async findMax(@Body() dto: ArrayOperationDto): Promise<string> {
    return this.arrayFlowService.createFlow(dto, MATH_ARRAY_OPS.MAX);
  }
}

Test Flow Producer

I made a HTTP request to find the max element of a number array

curl --location 'http://localhost:3000/math-array/max' \
--header 'Content-Type: application/json' \
--data '{
    "data": [1,2,5,-3, 90, 77, -900, 700, 300, 999, -1000, 1099, -2000]
}'

The response is a UUID job id

fecd7af4-31b0-4add-a716-4dd595794332

The flow producer splits the number array into 4 children jobs.

Children job 1 processes [1,2,5,-3]
Children job 2 processes [90, 77, -900, 700]
Children job 3 processes [300, 999, -1000, 1099]
Children job 4 processes [-2000]

Children jobs return 5, 700, 1099 and -2000 to the parent job respectively. The parent job invokes Math.max on [5, 700, 1099, -2000] and the final value is 1099.

[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: a1625cf4-15c8-4a9a-bc6f-a89a49f085a2, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.606Z. Result: 5
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: ed5c95aa-1854-4aa0-92cd-13b64ddf3dd1, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.613Z. Result: 700
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: f52ed2b2-ee11-46a5-963a-d15023564ed8, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.626Z. Result: 1099
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: 92aa6a17-c6f4-4a07-87d1-7b19054fbc51, name: MAX completed in queue math-array-child on 2023-12-17T15:45:39.647Z. Result: -2000
[Nest] 36584  - 12/17/2023, 11:45:39 PM     LOG [WorkerHostProcessor] Job id: fecd7af4-31b0-4add-a716-4dd595794332, name: MAX completed in queue math-array-merge on 2023-12-17T15:45:39.872Z. Result: 1099

The log in the terminal also describes the same steps to land the final value, 1099.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

  1. Github Repo: https://github.com/railsstudent/nestjs-bullmq-demo
  2. BullMq: https://docs.bullmq.io/
  3. Named processor in BullMq: https://docs.bullmq.io/patterns/named-processor