Queue and job processing in nestjs

Reading Time: 5 minutes

 63 total views,  1 views today

Introduction

The responsibility of backend application is to handle client requests and return responses back to the client. However, backend performance can degrade when the request is resource intensive; UI thread waits endlessly and user interface is blocked for further user actions. To avoid the unresponsiveness, we can place jobs in queue and delegate the processor to manage job processing.

In this blog post, I describe how to register bull queues in a Nestjs project and add jobs to queue to run in the backend. When processor consumes the job from the queue and finishes job processing, the queue listener logs message to terminal.

let's go

Create a new Nestjs project

nest generate nestjs-health-terminus

Install dependencies

Install bull queue package for nestjs and other dependencies for configuration and validation.

npm i @nestjs/bull bull
npm i @nestjs/config class-validator class-transformer
npm i --save-dev @types/bull

Add Docker Compose to install redis

version: '3.1'

services:
  redis:
    container_name: redis
    image: redis:6-alpine
    restart: always
    ports:
      - '6379:6379'
    volumes: 
      - cache:/data
    networks:
      - terminus

volumes:
  cache:

networks:
  terminus:
    driver: bridge

When we launch docker compose, it will install and run Redis 6 on port number 6379.

Import Bull module used by queues

//.env
REDIS_PORT=6379
REDIS_HOST=localhost
@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    BullModule.forRootAsync({
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        redis: {
          host: configService.get<string>('REDIS_HOST'),
          port: configService.get<number>('REDIS_PORT'),
        },
      }),
    }),
  ]
})
export class AppModule {}

Register bull queues to process jobs

To enable configService to find the values of REDIS_HOST and REDIS_PORT, the project needs to provide a .env with the environment variables.

// .env

REDIS_HOST=localhost
REDIS_PORT=6379

In tutorial, it is tempting to hardcode the values but professional developers should not do it in production application.

First, we create a queue module to demonstrate some examples of queue and process jobs.

import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { FibonacciService, PrimeService } from './services';
import { QueueController } from './controllers';
import { PrimeProcessor, FibonacciProcessor } from './processors';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'fibonacci',
    }),
    BullModule.registerQueue({
      name: 'prime',
    }),
  ],
  providers: [FibonacciService, FibonacciProcessor, PrimeProcessor, PrimeService],
  controllers: [QueueController],
})
export class QueueModule {}

The module is consisted of the following

  • Two bull queues, fibonacci and prime, respectively
  • Queue controller that adds jobs to fibonacci and prime queues
  • FibonacciService that calculates the first Nth fibonacci numbers
  • FibonacciProcessor that processes jobs in fibonacci queue
  • PrimeService that solves two problems of prime factors
  • PrimeProcessor that processes “prime-factors” and “distinct-prime-factors” jobs

Add jobs to queue and wait for job processing

I inject queues to add job in the controller in my example but queuing can also occur in services. If the background job depends on the previous results in the service, then we will inject the queue into the service and append jobs to it inside the method.

@Controller('queue')
export class QueueController {
  constructor(
    @InjectQueue('fibonacci') private fibonacciQueue: Queue<{ order: number }>,
  ) {}

  @Post('fib')
  async getFibonacci(@Query('order', ParseIntPipe) order: number): Promise<void>   {
    console.log(`${new Date()} - Job submitted to queue`, order);
    await this.fibonacciQueue.add({ order }, { delay: 1000 });
  }
}

Fibonacci queue has a single job that is to calculate the first Nth fibonacci numbers a

await this.fibonacciQueue.add({ order }, { delay: 1000 });

adds job with data { order } and delays by one second ({ delay: 1000 }).

Add named jobs to queue for job processing

@Controller('queue')
export class QueueController {
  constructor(@InjectQueue('prime') private primeQueue: Queue) {}


  @Post('prime-factors')
  async getPrimeFactors(@Query('input', ParseIntPipe) input: number): Promise<void> {
    console.log(`${new Date()} - Prime factors job submitted to prime queue`, input);
    await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });
  }

  @Post('distinct-prime-factors')
  async getDistinctPrimeFactors(@Body() arrayDto: ArrayProductsDto): Promise<void> {
    console.log(`${new Date()} - Distinct prime factor job submitted to prime queue`, arrayDto.products);
    await this.primeQueue.add('distinct-prime-factors', {
       products: arrayDto.products,
    });
  }
}

On the other hand, prime queue manages two types of jobs. Therefore, we specify the job name when appending job to the queue.

await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });

The code snippet appends ‘prime-factors’ job to queue to find all prime factors of an integer

await this.primeQueue.add('distinct-prime-factors', {
   products: arrayDto.products,
});

The other code snippet appends ‘distinct-prime-factors’ job to queue to find distinct prime factors of an integer.

Next, we define job processors to process the jobs such that they don’t stay in idle status in the queues.

Define job processors for job processing

It is really easy to create job processors in nestJs. Job processor is a class annotated by @Processor() decorator and queue name. Each method has a @process() decorator and an optional job name to consume queue job.

@Processor('fibonacci')
export class FibonacciProcessor {
  constructor(private fibonacciService: FibonacciService) {}

  @Process()
  calculateNFibonacciNumbers({ data }: Job<{ order: number }>): void {
    const fibSequences = this.fibonacciService.fibonacci(data.order);
    console.log(`${new Date()} Calculating ${data.order + 1} fibonacci numbers...`);
    for (let i = 0; i < data.order; i++) {
      console.log(`${new Date()} - Fib(${i}) = ${fibSequences[i]}`);
    }
  }
}

@Processor('fibonacci') listens to fibonacci queue registered in queue module. The method, calcualteNFibonacciNumbers, is responsible for all jobs of fibonacci queue because @Process() decorator does not specify a job name. In my opinion, the method will violate single responsibility principle if it processes all types of jobs of the queue.

Define specialized job method for job processing

If we want a process method to take care of a single type of job, @Process() decorator accepts name parameter that represents job name.

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { PrimeService } from '../services';

@Processor('prime')
export class PrimeProcessor {
  constructor(private primeService: PrimeService) {}

  @Process('prime-factors')
  findPrimeFactors({ data }: Job<{ input: number }>): void {
    const primeFactors = this.primeService.getPrimeFactors(data.input);
    console.log(`${new Date()} - All prime factors`, primeFactors);
  }

  @Process('distinct-prime-factors')
  findDistinctPrimeFactors({ data }: Job<{ products: number[] }>): void {
    const { primeFactors, count } = this.primeService.getDistinctPrimeFactors(data.products);
    console.log(`${new Date()} - Distinct prime factors`, primeFactors, 'count:', count);
  }
}

findPrimeFactors receives 'prime-factors' job and determines all prime factors of an integer. Similarly, findDistinctPrimeFactors receives 'distinct-prime-factors' job and determines distinct prime factors and the number of them. It is a clean approach than if-then-else to execute different methods of primeService to return the expected results.

Listen to queue events during the lifecycle of job processing

Queue events of Bull offer pre-process and post-process hooks for developers to perform custom actions such as logging and caching. When the events are local, they must implement within the processor such as PrimeProcessor.

The code here implements OnQueueActive and OnQueueCompleted to log start and end time of the job.

@Processor('prime')
export class PrimeProcessor {
  constructor(private primeService: PrimeService) {}

 ...

 @OnQueueActive()
 onJobActive(job: Job) {
    console.log(
      `onJobActive - id: ${job.id}, name: ${job.name}, data: `,
      job.data,
      ` starts at ${new Date(job.timestamp)}`,
    );
 }

 @OnQueueCompleted()
 onJobSuccess(job: Job, result: any) {
    console.log(
      `onJobSuccess - id: ${job.id}, name: ${job.name}, data: `,
      job.data,
      ` completes at ${new Date(job.finishedOn)}`,
      'result',
      result,
    );
 }

Execute cURL request

curl --location --request POST 'http://localhost:3000/queue/prime-factors?input=88'

Console log output:

onJobActive - id: 1, name: prime-factors, data:  { input: 88 }  starts at Sat Sep 17 2022 11:20:54 GMT+0800 (Hong Kong Standard Time)
onJobSuccess - id: 1, name: prime-factors, data:  { input: 88 }  completes at Sat Sep 17 2022 11:20:55 GMT+0800 (Hong Kong Standard Time) result undefined

Final Thoughts

In this post, we show a comprehensive bull queue example in Nestjs. When an application has a task that is expected to run a long time, we have the option to place it in the queue to process somewhere. Then, we do not block the main event loop and ensure UI is responsive for further user actions.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

  1. Queue: https://docs.nestjs.com/techniques/queues
  2. Bull: https://github.com/OptimalBits/bull
  3. Repo: https://github.com/railsstudent/nestjs-health-terminus