Introduction
The responsibility of backend application is to handle client requests and return responses back to the client. However, backend performance can degrade when the request is resource intensive; UI thread waits endlessly and user interface is blocked for further user actions. To avoid the unresponsiveness, we can place jobs in queue and delegate the processor to manage job processing.
In this blog post, I describe how to register bull queues in a Nestjs project and add jobs to queue to run in the backend. When processor consumes the job from the queue and finishes job processing, the queue listener logs message to terminal.
Create a new Nestjs project
nest generate nestjs-health-terminus
Install dependencies
Install bull queue package for nestjs and other dependencies for configuration and validation.
npm i @nestjs/bull bull
npm i @nestjs/config class-validator class-transformer
npm i --save-dev @types/bull
Add Docker Compose to install redis
version: '3.1'
services:
redis:
container_name: redis
image: redis:6-alpine
restart: always
ports:
- '6379:6379'
volumes:
- cache:/data
networks:
- terminus
volumes:
cache:
networks:
terminus:
driver: bridge
When we launch docker compose, it will install and run Redis 6 on port number 6379.
Import Bull module used by queues
//.env
REDIS_PORT=6379
REDIS_HOST=localhost
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
BullModule.forRootAsync({
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
redis: {
host: configService.get<string>('REDIS_HOST'),
port: configService.get<number>('REDIS_PORT'),
},
}),
}),
]
})
export class AppModule {}
Register bull queues to process jobs
To enable configService to find the values of REDIS_HOST and REDIS_PORT, the project needs to provide a .env with the environment variables.
// .env
REDIS_HOST=localhost
REDIS_PORT=6379
In tutorial, it is tempting to hardcode the values but professional developers should not do it in production application.
First, we create a queue module to demonstrate some examples of queue and process jobs.
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { FibonacciService, PrimeService } from './services';
import { QueueController } from './controllers';
import { PrimeProcessor, FibonacciProcessor } from './processors';
@Module({
imports: [
BullModule.registerQueue({
name: 'fibonacci',
}),
BullModule.registerQueue({
name: 'prime',
}),
],
providers: [FibonacciService, FibonacciProcessor, PrimeProcessor, PrimeService],
controllers: [QueueController],
})
export class QueueModule {}
The module is consisted of the following
- Two bull queues, fibonacci and prime, respectively
- Queue controller that adds jobs to fibonacci and prime queues
- FibonacciService that calculates the first Nth fibonacci numbers
- FibonacciProcessor that processes jobs in fibonacci queue
- PrimeService that solves two problems of prime factors
- PrimeProcessor that processes “prime-factors” and “distinct-prime-factors” jobs
Add jobs to queue and wait for job processing
I inject queues to add job in the controller in my example but queuing can also occur in services. If the background job depends on the previous results in the service, then we will inject the queue into the service and append jobs to it inside the method.
@Controller('queue')
export class QueueController {
constructor(
@InjectQueue('fibonacci') private fibonacciQueue: Queue<{ order: number }>,
) {}
@Post('fib')
async getFibonacci(@Query('order', ParseIntPipe) order: number): Promise<void> {
console.log(`${new Date()} - Job submitted to queue`, order);
await this.fibonacciQueue.add({ order }, { delay: 1000 });
}
}
Fibonacci queue has a single job that is to calculate the first Nth fibonacci numbers a
await this.fibonacciQueue.add({ order }, { delay: 1000 });
adds job with data { order }
and delays by one second ({ delay: 1000 }
).
Add named jobs to queue for job processing
@Controller('queue')
export class QueueController {
constructor(@InjectQueue('prime') private primeQueue: Queue) {}
@Post('prime-factors')
async getPrimeFactors(@Query('input', ParseIntPipe) input: number): Promise<void> {
console.log(`${new Date()} - Prime factors job submitted to prime queue`, input);
await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });
}
@Post('distinct-prime-factors')
async getDistinctPrimeFactors(@Body() arrayDto: ArrayProductsDto): Promise<void> {
console.log(`${new Date()} - Distinct prime factor job submitted to prime queue`, arrayDto.products);
await this.primeQueue.add('distinct-prime-factors', {
products: arrayDto.products,
});
}
}
On the other hand, prime queue manages two types of jobs. Therefore, we specify the job name when appending job to the queue.
await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });
The code snippet appends ‘prime-factors’ job to queue to find all prime factors of an integer
await this.primeQueue.add('distinct-prime-factors', {
products: arrayDto.products,
});
The other code snippet appends ‘distinct-prime-factors’ job to queue to find distinct prime factors of an integer.
Next, we define job processors to process the jobs such that they don’t stay in idle status in the queues.
Define job processors for job processing
It is really easy to create job processors in nestJs. Job processor is a class annotated by @Processor()
decorator and queue name. Each method has a @process()
decorator and an optional job name to consume queue job.
@Processor('fibonacci')
export class FibonacciProcessor {
constructor(private fibonacciService: FibonacciService) {}
@Process()
calculateNFibonacciNumbers({ data }: Job<{ order: number }>): void {
const fibSequences = this.fibonacciService.fibonacci(data.order);
console.log(`${new Date()} Calculating ${data.order + 1} fibonacci numbers...`);
for (let i = 0; i < data.order; i++) {
console.log(`${new Date()} - Fib(${i}) = ${fibSequences[i]}`);
}
}
}
@Processor('fibonacci')
listens to fibonacci queue registered in queue module. The method, calcualteNFibonacciNumbers
, is responsible for all jobs of fibonacci queue because @Process() decorator does not specify a job name. In my opinion, the method will violate single responsibility principle if it processes all types of jobs of the queue.
Define specialized job method for job processing
If we want a process method to take care of a single type of job, @Process()
decorator accepts name parameter that represents job name.
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { PrimeService } from '../services';
@Processor('prime')
export class PrimeProcessor {
constructor(private primeService: PrimeService) {}
@Process('prime-factors')
findPrimeFactors({ data }: Job<{ input: number }>): void {
const primeFactors = this.primeService.getPrimeFactors(data.input);
console.log(`${new Date()} - All prime factors`, primeFactors);
}
@Process('distinct-prime-factors')
findDistinctPrimeFactors({ data }: Job<{ products: number[] }>): void {
const { primeFactors, count } = this.primeService.getDistinctPrimeFactors(data.products);
console.log(`${new Date()} - Distinct prime factors`, primeFactors, 'count:', count);
}
}
findPrimeFactors
receives 'prime-factors'
job and determines all prime factors of an integer. Similarly, findDistinctPrimeFactors
receives 'distinct-prime-factors'
job and determines distinct prime factors and the number of them. It is a clean approach than if-then-else to execute different methods of primeService to return the expected results.
Listen to queue events during the lifecycle of job processing
Queue events of Bull offer pre-process and post-process hooks for developers to perform custom actions such as logging and caching. When the events are local, they must implement within the processor such as PrimeProcessor.
The code here implements OnQueueActive
and OnQueueCompleted
to log start and end time of the job.
@Processor('prime')
export class PrimeProcessor {
constructor(private primeService: PrimeService) {}
...
@OnQueueActive()
onJobActive(job: Job) {
console.log(
`onJobActive - id: ${job.id}, name: ${job.name}, data: `,
job.data,
` starts at ${new Date(job.timestamp)}`,
);
}
@OnQueueCompleted()
onJobSuccess(job: Job, result: any) {
console.log(
`onJobSuccess - id: ${job.id}, name: ${job.name}, data: `,
job.data,
` completes at ${new Date(job.finishedOn)}`,
'result',
result,
);
}
Execute cURL request
curl --location --request POST 'http://localhost:3000/queue/prime-factors?input=88'
Console log output:
onJobActive - id: 1, name: prime-factors, data: { input: 88 } starts at Sat Sep 17 2022 11:20:54 GMT+0800 (Hong Kong Standard Time)
onJobSuccess - id: 1, name: prime-factors, data: { input: 88 } completes at Sat Sep 17 2022 11:20:55 GMT+0800 (Hong Kong Standard Time) result undefined
Final Thoughts
In this post, we show a comprehensive bull queue example in Nestjs. When an application has a task that is expected to run a long time, we have the option to place it in the queue to process somewhere. Then, we do not block the main event loop and ensure UI is responsive for further user actions.
This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.