Check health of nestjs bull queue with terminus

Reading Time: 5 minutes

Loading

Introduction

It is common for enterprise applications to connect to external resources to process requests and get back responses. Therefore, application should know that the components it depends on are up and running all the times. Otherwise, the application behaves erroneously when serving client requests. To stay aware of the availability of connected resources, we can perform health check on them and wait for the results.

In this blog post, I describe the use case of health check bull queue in nestjs with terminus. First, it requires a custom health indicator to verify that the queue is connected to redis and redis status is ready. Next, I add a /health endpoint that calls the health indicator to return the status.

let's go

Clone or fork the Nestjs project

You can find the sample code of nestjs bull queue in this repo: https://github.com/railsstudent/nestjs-health-terminus

nest generate nestjs-health-terminus

Install dependencies

Install nestjs terminus library to the project

npm install --save @nestjs/terminus

Create health module for health check

nest g mo health

Import HealthModule in AppModule

@Module({
  imports: [
    ... other modules...
    HealthModule,
  ]
})
export class AppModule {}

Add a custom health indicator to health check bull queue

@nestjs/terminus does not provide built-in health indicator for bull queue; therefore, I create a custom one in the health module.

First, Use nestjs CLI to generate a health indicator

nest g s health/health/bullQueue --flat

My convention is to rename the filename suffix from .service.ts to .health.ts.

src/health
├── controllers
│   ├── health.controller.ts
│   └── index.ts
├── health
│   ├── bull-queue.health.ts
│   └── index.ts
├── health.module.ts
└── index.ts

Then, open bull-queue.health.ts and extends BullQueueHealthIndicator from HealthIndicator.

import { getQueueToken } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { HealthIndicator, HealthIndicatorResult, HealthCheckError } from '@nestjs/terminus';
import { Queue } from 'bull';

@Injectable()
export class BullQueueHealthIndicator extends HealthIndicator {
  constructor(private moduleRef: ModuleRef) {
    super();
  }
}

Next, import TerminusModule into HealthModule and verify that the providers array has the entry of BullQueueHealthIndicator.

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './controllers';
import { BullQueueHealthIndicator } from './health';

@Module({
  imports: [TerminusModule],
  controllers: [HealthController],
  providers: [BullQueueHealthIndicator],
})
export class HealthModule {}

Add health check logic in custom health indicator

In BullQueueHealthIndicator, I implement isHealthy() method to check the health of fibonacci and prime queues. When the queue connects to redis and redis status is ready, it is considered up. Otherwise, the queue is down and the indicator returns error messages.

async isHealthy(queues: string[]): Promise<HealthIndicatorResult> {
    const promiseResults = await this.checkQueuesReady(queues);
    const errorResults = this.filterErrors(promiseResults);

    if (errorResults.length) {
      throw new HealthCheckError('Bull queue failed', this.getStatus('bull', 
         false, { errors: errorResults }));
    }

    return this.getStatus('bull', true);
 }

checkQueuesReady accepts a list of queue names and examines their readiness.

private async checkQueuesReady(queues: string[]) {
    const promises = queues.map(async (name) => {
      const queueToken = this.moduleRef.get(getQueueToken(name), 
          { strict: false });
      if ((queueToken as Queue).isReady) {
        const queue = await (queueToken as Queue).isReady();
        const isEveryClientReady = queue.clients.every((client) => client.status === 'ready');
        if (!isEveryClientReady) {
          throw new Error(`${name} - some redis clients are not ready`);
        }
        return true;
      }
      throw new Error(`${name} is not a bull queue`);
    });

    return Promise.allSettled(promises);
}

const queueToken = this.moduleRef.get(getQueueToken(name), { strict: false });

The code uses the queue token to obtain the queue instance. When the queue is ready, I get queue.clients that is an array of redis clients. When every redis client has ready status, the queue is ready to accept jobs. On the other hand, when redis store is not set up, the method throws an error message.

private filterErrors(promiseResults: PromiseSettledResult<boolean>[]): string[] {
    const errorResults: string[] = [];
    for (const promiseResult of promiseResults) {
      if (promiseResult.status === 'rejected') {
        if (promiseResult.reason instanceof Error) {
          errorResults.push(promiseResult.reason.message);
        } else {
          errorResults.push(promiseResult.reason);
        }
      }
    }
    return errorResults;
 }

The result of checkQueuesReady is a collection of resolved and rejected promises. filterErrors iterates the promise results, finds those that are rejected and stores the error messages.

When health check returns error messages, isHealthy throws HealthCheckError.

throw new HealthCheckError('Bull queue failed', this.getStatus('bull', false, 
    { errors: errorResults }));

When health check does not find error, isHealthy returns

this.getStatus('bull', true);

Define health controller

nest g co health/controllers/health --flat

import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { BullQueueHealthIndicator } from '../health/bull-queue.health';

@Controller('health')
export class HealthController {
  constructor(private health: HealthCheckService, private bull: BullQueueHealthIndicator) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime'])]);
  }
}

First, the GET endpoint is annotated with a HealthCheck decorator. Second, the HealthCheckService expects some health indicators to execute. Therefore, I provide BullQueueHealthIndicator to the check method to process.

this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime'])])

checks the health of fibonacci and prime queues and maps the health status to the object with key “bull”.

Invoke health check endpoint

Run docker-compose to start redis. Since both fibonacci and prime queues can connect to redis, the health check status should be “up”

http://localhost:3000/health

Kill docker-compose and call the same endpoint to see a different response.

Both queues are down because redis has not launched yet.

Suppose I also check ‘dummy’ that is not a bull queue, isHealthy outputs bad injection token error message.

return this.health.check([() => this.bull.isHealthy(['fibonacci', 'prime', 'dummy'])]);

Final Thoughts

In this post, I show how to use terminus package to write a custom health indicator to check the health of bull queue. When terminus package does not provide predefined indicators and there is no open source library, we have the ability to write our own indicators to perform the appropriate testing.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in NestJS and other technologies.

Resources:

  1. Repo: https://github.com/railsstudent/nestjs-health-terminus
  2. Queue: https://docs.nestjs.com/techniques/queues
  3. Bull: https://github.com/OptimalBits/bull
  4. Terminus: https://docs.nestjs.com/recipes/terminus