Check health of nestjs bull queue with terminus

Reading Time: 5 minutes

 71 total views

Introduction

It is common for enterprise applications to connect to external resources to process requests and get back responses. Therefore, application should know that the components it depends on are up and running all the times. Otherwise, the application behaves erroneously when serving client requests. To stay aware of the availability of connected resources, we can perform health check on them and wait for the results.

In this blog post, I describe the use case of health check bull queue in nestjs with terminus. First, it requires a custom health indicator to verify that the queue is connected to redis and redis status is ready. Next, I add a /health endpoint that calls the health indicator to return the status.

let's go

Clone or fork the Nestjs project

You can find the sample code of nestjs bull queue in this repo: https://github.com/railsstudent/nestjs-health-terminus

nest generate nestjs-health-terminus

Install dependencies

Install nestjs terminus library to the project

npm install --save @nestjs/terminus

Create health module for health check

nest g mo health

Import HealthModule in AppModule

@Module({
  imports: [
    ... other modules...
    HealthModule,
  ]
})
export class AppModule {}

Add a custom health indicator to health check bull queue

@nestjs/terminus does not provide built-in health indicator for bull queue; therefore, I create a custom one in the health module.

First, Use nestjs CLI to generate a health indicator

nest g s health/health/bullQueue --flat

My convention is to rename the filename suffix from .service.ts to .health.ts.

src/health
├── controllers
│   ├── health.controller.ts
│   └── index.ts
├── health
│   ├── bull-queue.health.ts
│   └── index.ts
├── health.module.ts
└── index.ts

Then, open bull-queue.health.ts and extends BullQueueHealthIndicator from HealthIndicator.

import { getQueueToken } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { HealthIndicator, HealthIndicatorResult, HealthCheckError } from '@nestjs/terminus';
import { Queue } from 'bull';

@Injectable()
export class BullQueueHealthIndicator extends HealthIndicator {
  constructor(private moduleRef: ModuleRef) {
    super();
  }
}

Next, import TerminusModule into HealthModule and verify that the providers array has the entry of BullQueueHealthIndicator.

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './controllers';
import { BullQueueHealthIndicator } from './health';

@Module({
  imports: [TerminusModule],
  controllers: [HealthController],
  providers: [BullQueueHealthIndicator],
})
export class HealthModule {}

Add health check logic in custom health indicator

In BullQueueHealthIndicator, I implement isHealthy() method to check the health of fibonacci and prime queues. When the queue connects to redis and redis status is ready, it is considered up. Otherwise, the queue is down and the indicator returns error messages.

async isHealthy(queues: string[]): Promise<HealthIndicatorResult> {
    const promiseResults = await this.checkQueuesReady(queues);
    const errorResults = this.filterErrors(promiseResults);

    if (errorResults.length) {
      throw new HealthCheckError('Bull queue failed', this.getStatus('bull', 
         false, { errors: errorResults }));
    }

    return this.getStatus('bull', true);
 }

checkQueuesReady accepts a list of queue names and examines their readiness.

private async checkQueuesReady(queues: string[]) {
    const promises = queues.map(async (name) => {
      const queueToken = this.moduleRef.get(getQueueToken(name), 
          { strict: false });
      if ((queueToken as Queue).isReady) {
        const queue = await (queueToken as Queue).isReady();
        const isEveryClientReady = queue.clients.every((client) => client.status === 'ready');
        if (!isEveryClientReady) {
          throw new Error(`${name} - some redis clients are not ready`);
        }
        return true;
      }
      throw new Error(`${name} is not a bull queue`);
    });

    return Promise.allSettled(promises);
}

const queueToken = this.moduleRef.get(getQueueToken(name), { strict: false });

The code uses the queue token to obtain the queue instance. When the queue is ready, I get queue.clients that is an array of redis clients. When every redis client has ready status, the queue is ready to accept jobs. On the other hand, when redis store is not set up, the method throws an error message.

private filterErrors(promiseResults: PromiseSettledResult<boolean>[]): string[] {
    const errorResults: string[] = [];
    for (const promiseResult of promiseResults) {
      if (promiseResult.status === 'rejected') {
        if (promiseResult.reason instanceof Error) {
          errorResults.push(promiseResult.reason.message);
        } else {
          errorResults.push(promiseResult.reason);
        }
      }
    }
    return errorResults;
 }

The result of checkQueuesReady is a collection of resolved and rejected promises. filterErrors iterates the promise results, finds those that are rejected and stores the error messages.

When health check returns error messages, isHealthy throws HealthCheckError.

throw new HealthCheckError('Bull queue failed', this.getStatus('bull', false, 
    { errors: errorResults }));

When health check does not find error, isHealthy returns

this.getStatus('bull', true);

Define health controller

nest g co health/controllers/health --flat

import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { BullQueueHealthIndicator } from '../health/bull-queue.health';

@Controller('health')
export class HealthController {
  constructor(private health: HealthCheckService, private bull: BullQueueHealthIndicator) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime'])]);
  }
}

First, the GET endpoint is annotated with a HealthCheck decorator. Second, the HealthCheckService expects some health indicators to execute. Therefore, I provide BullQueueHealthIndicator to the check method to process.

this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime'])])

checks the health of fibonacci and prime queues and maps the health status to the object with key “bull”.

Invoke health check endpoint

Run docker-compose to start redis. Since both fibonacci and prime queues can connect to redis, the health check status should be “up”

http://localhost:3000/health

Kill docker-compose and call the same endpoint to see a different response.

Both queues are down because redis has not launched yet.

Suppose I also check ‘dummy’ that is not a bull queue, isHealthy outputs bad injection token error message.

return this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime', 'dummy'])]);

Final Thoughts

In this post, I show how to use terminus package to write a custom health indicator to check the health of bull queue. When terminus package does not provide predefined indicators and there is no open source library, we have the ability to write our own indicators to perform the appropriate testing.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

  1. Repo: https://github.com/railsstudent/nestjs-health-terminus
  2. Queue: https://docs.nestjs.com/techniques/queues
  3. Bull: https://github.com/OptimalBits/bull
  4. Terminus: https://docs.nestjs.com/recipes/terminus

Queue and job processing in nestjs

Reading Time: 5 minutes

 99 total views

Introduction

The responsibility of backend application is to handle client requests and return responses back to the client. However, backend performance can degrade when the request is resource intensive; UI thread waits endlessly and user interface is blocked for further user actions. To avoid the unresponsiveness, we can place jobs in queue and delegate the processor to manage job processing.

In this blog post, I describe how to register bull queues in a Nestjs project and add jobs to queue to run in the backend. When processor consumes the job from the queue and finishes job processing, the queue listener logs message to terminal.

let's go

Create a new Nestjs project

nest generate nestjs-health-terminus

Install dependencies

Install bull queue package for nestjs and other dependencies for configuration and validation.

npm i @nestjs/bull bull
npm i @nestjs/config class-validator class-transformer
npm i --save-dev @types/bull

Add Docker Compose to install redis

version: '3.1'

services:
  redis:
    container_name: redis
    image: redis:6-alpine
    restart: always
    ports:
      - '6379:6379'
    volumes: 
      - cache:/data
    networks:
      - terminus

volumes:
  cache:

networks:
  terminus:
    driver: bridge

When we launch docker compose, it will install and run Redis 6 on port number 6379.

Import Bull module used by queues

//.env
REDIS_PORT=6379
REDIS_HOST=localhost
@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    BullModule.forRootAsync({
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        redis: {
          host: configService.get<string>('REDIS_HOST'),
          port: configService.get<number>('REDIS_PORT'),
        },
      }),
    }),
  ]
})
export class AppModule {}

Register bull queues to process jobs

To enable configService to find the values of REDIS_HOST and REDIS_PORT, the project needs to provide a .env with the environment variables.

// .env

REDIS_HOST=localhost
REDIS_PORT=6379

In tutorial, it is tempting to hardcode the values but professional developers should not do it in production application.

First, we create a queue module to demonstrate some examples of queue and process jobs.

import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { FibonacciService, PrimeService } from './services';
import { QueueController } from './controllers';
import { PrimeProcessor, FibonacciProcessor } from './processors';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'fibonacci',
    }),
    BullModule.registerQueue({
      name: 'prime',
    }),
  ],
  providers: [FibonacciService, FibonacciProcessor, PrimeProcessor, PrimeService],
  controllers: [QueueController],
})
export class QueueModule {}

The module is consisted of the following

  • Two bull queues, fibonacci and prime, respectively
  • Queue controller that adds jobs to fibonacci and prime queues
  • FibonacciService that calculates the first Nth fibonacci numbers
  • FibonacciProcessor that processes jobs in fibonacci queue
  • PrimeService that solves two problems of prime factors
  • PrimeProcessor that processes “prime-factors” and “distinct-prime-factors” jobs

Add jobs to queue and wait for job processing

I inject queues to add job in the controller in my example but queuing can also occur in services. If the background job depends on the previous results in the service, then we will inject the queue into the service and append jobs to it inside the method.

@Controller('queue')
export class QueueController {
  constructor(
    @InjectQueue('fibonacci') private fibonacciQueue: Queue<{ order: number }>,
  ) {}

  @Post('fib')
  async getFibonacci(@Query('order', ParseIntPipe) order: number): Promise<void>   {
    console.log(`${new Date()} - Job submitted to queue`, order);
    await this.fibonacciQueue.add({ order }, { delay: 1000 });
  }
}

Fibonacci queue has a single job that is to calculate the first Nth fibonacci numbers a

await this.fibonacciQueue.add({ order }, { delay: 1000 });

adds job with data { order } and delays by one second ({ delay: 1000 }).

Add named jobs to queue for job processing

@Controller('queue')
export class QueueController {
  constructor(@InjectQueue('prime') private primeQueue: Queue) {}


  @Post('prime-factors')
  async getPrimeFactors(@Query('input', ParseIntPipe) input: number): Promise<void> {
    console.log(`${new Date()} - Prime factors job submitted to prime queue`, input);
    await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });
  }

  @Post('distinct-prime-factors')
  async getDistinctPrimeFactors(@Body() arrayDto: ArrayProductsDto): Promise<void> {
    console.log(`${new Date()} - Distinct prime factor job submitted to prime queue`, arrayDto.products);
    await this.primeQueue.add('distinct-prime-factors', {
       products: arrayDto.products,
    });
  }
}

On the other hand, prime queue manages two types of jobs. Therefore, we specify the job name when appending job to the queue.

await this.primeQueue.add('prime-factors', { input }, { delay: 1000 });

The code snippet appends ‘prime-factors’ job to queue to find all prime factors of an integer

await this.primeQueue.add('distinct-prime-factors', {
   products: arrayDto.products,
});

The other code snippet appends ‘distinct-prime-factors’ job to queue to find distinct prime factors of an integer.

Next, we define job processors to process the jobs such that they don’t stay in idle status in the queues.

Define job processors for job processing

It is really easy to create job processors in nestJs. Job processor is a class annotated by @Processor() decorator and queue name. Each method has a @process() decorator and an optional job name to consume queue job.

@Processor('fibonacci')
export class FibonacciProcessor {
  constructor(private fibonacciService: FibonacciService) {}

  @Process()
  calculateNFibonacciNumbers({ data }: Job<{ order: number }>): void {
    const fibSequences = this.fibonacciService.fibonacci(data.order);
    console.log(`${new Date()} Calculating ${data.order + 1} fibonacci numbers...`);
    for (let i = 0; i < data.order; i++) {
      console.log(`${new Date()} - Fib(${i}) = ${fibSequences[i]}`);
    }
  }
}

@Processor('fibonacci') listens to fibonacci queue registered in queue module. The method, calcualteNFibonacciNumbers, is responsible for all jobs of fibonacci queue because @Process() decorator does not specify a job name. In my opinion, the method will violate single responsibility principle if it processes all types of jobs of the queue.

Define specialized job method for job processing

If we want a process method to take care of a single type of job, @Process() decorator accepts name parameter that represents job name.

import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { PrimeService } from '../services';

@Processor('prime')
export class PrimeProcessor {
  constructor(private primeService: PrimeService) {}

  @Process('prime-factors')
  findPrimeFactors({ data }: Job<{ input: number }>): void {
    const primeFactors = this.primeService.getPrimeFactors(data.input);
    console.log(`${new Date()} - All prime factors`, primeFactors);
  }

  @Process('distinct-prime-factors')
  findDistinctPrimeFactors({ data }: Job<{ products: number[] }>): void {
    const { primeFactors, count } = this.primeService.getDistinctPrimeFactors(data.products);
    console.log(`${new Date()} - Distinct prime factors`, primeFactors, 'count:', count);
  }
}

findPrimeFactors receives 'prime-factors' job and determines all prime factors of an integer. Similarly, findDistinctPrimeFactors receives 'distinct-prime-factors' job and determines distinct prime factors and the number of them. It is a clean approach than if-then-else to execute different methods of primeService to return the expected results.

Listen to queue events during the lifecycle of job processing

Queue events of Bull offer pre-process and post-process hooks for developers to perform custom actions such as logging and caching. When the events are local, they must implement within the processor such as PrimeProcessor.

The code here implements OnQueueActive and OnQueueCompleted to log start and end time of the job.

@Processor('prime')
export class PrimeProcessor {
  constructor(private primeService: PrimeService) {}

 ...

 @OnQueueActive()
 onJobActive(job: Job) {
    console.log(
      `onJobActive - id: ${job.id}, name: ${job.name}, data: `,
      job.data,
      ` starts at ${new Date(job.timestamp)}`,
    );
 }

 @OnQueueCompleted()
 onJobSuccess(job: Job, result: any) {
    console.log(
      `onJobSuccess - id: ${job.id}, name: ${job.name}, data: `,
      job.data,
      ` completes at ${new Date(job.finishedOn)}`,
      'result',
      result,
    );
 }

Execute cURL request

curl --location --request POST 'http://localhost:3000/queue/prime-factors?input=88'

Console log output:

onJobActive - id: 1, name: prime-factors, data:  { input: 88 }  starts at Sat Sep 17 2022 11:20:54 GMT+0800 (Hong Kong Standard Time)
onJobSuccess - id: 1, name: prime-factors, data:  { input: 88 }  completes at Sat Sep 17 2022 11:20:55 GMT+0800 (Hong Kong Standard Time) result undefined

Final Thoughts

In this post, we show a comprehensive bull queue example in Nestjs. When an application has a task that is expected to run a long time, we have the option to place it in the queue to process somewhere. Then, we do not block the main event loop and ensure UI is responsive for further user actions.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

  1. Queue: https://docs.nestjs.com/techniques/queues
  2. Bull: https://github.com/OptimalBits/bull
  3. Repo: https://github.com/railsstudent/nestjs-health-terminus