Dynamic Task Scheduler In NestJS

Reading Time: 7 minutes

Loading

Scenario: Our ESG (Environmental, Social, and Governance) platform requires to run schedule jobs to call endpoints to generate reports for internal teams and send company emails to our customers. Development team was tasked with developing a job scheduler that executes cron jobs at a specified period of time. node-cron is a popular open source library for setting up node-like cron jobs in a typescript program, however, Nest provides Schedule module that does basically the same thing with decorators and flexible API. Since our backend is powered by Nest, we prefer to use existing features of NestJS instead of bringing in a new library and investing time to study a new API.

Problem: We followed Nest documentation and created all cron jobs declaratively. These jobs run on a daily or weekly basis that are inconvenient for QA team to test because they cannot wait for such a long time to verify test results. The approach is to configure jobs to run in different intervals in different environments. For example, a job is configured to run once a day in production environment and run every minute in QA environment to facilitate testing cycle.

To cater to the new QA request, developers converted declarative cron jobs to dynamic counterparts through dynamic schedule module API. However, the codes were no longer DRY and I refactored the codebase to keep future development efforts to the minimum.

Create a NestJS application

nest new nest-dynamic-scheduler

Install configuration, http and schedule dependencies

npm i --save @nestjs/config joi
npm i --save @nestjs/axios
npm install --save @nestjs/schedule
npm install --save-dev @types/cron

Store environment variables in environment file

Store environment variables in .env and we will use configuration service to retrieve the base url in the codes.

Import Configuration Module

Import configuration module and validate the schema of .env file.

Validation part completes. We can move on to add dummy endpoints in AppController and a Task module that calls them in cron jobs.

Create dummy endpoints in App Controller

We define a printMessage function in AppService that will be called by the AppController

AppController has three new endpoints: the first one handles POST method, the second one handles PATCH and the final one handles PUT. It is to demonstrate that Nest cron jobs work with common http request methods.

Create new Task module

nest g mo Task

Next, I generate task service and import HttpModule and ScheduleModule.forRoot() to TaskModule. The following is the declaration of the TaskModule:

import { HttpModule, Module } from '@nestjs/common'
import { ScheduleModule } from '@nestjs/schedule'
import { TaskService } from './task.service'

@Module({
  imports: [HttpModule, ScheduleModule.forRoot()],
  providers: [TaskService],
  controllers: [],
})
export class TaskModule {}

First Attempt: Create declarative cron jobs in TaskService

// task.service.ts

private readonly baseUrl: string

constructor(
    private httpService: HttpService,
    private configService: ConfigService,
  ) {
    this.baseUrl = this.configService.get<string>('BASE_URL', '')
}

@Cron('*/15 * * * * *')
async handlePostJob(): Promise<void> {
    try {
      await this.httpService
        .post(`${this.baseUrl}/post-job`, {
          name: 'connie',
          msg: 'schedule post job every 15 second',
          timestamp: Date.now(),
        })
        .toPromise()
   } catch (err) {
      console.error(err)
   }
}

@Cron('*/20 * * * * *')
async handlePatchJob(): Promise<void> {
  try {
    await this.httpService
       .patch(`${this.baseUrl}/patch-job`, {
          name: 'connie',
          msg: 'schedule patch job every 20 second',
          timestamp: Date.now(),
       })
       .toPromise()
  } catch (err) {
     console.error(err)
  }
}

@Cron('*/30 * * * * *')
async handlePutJob(): Promise<void> {
  try {
    await this.httpService
      .put(`${this.baseUrl}/put-job`, {
         name: 'connie',
         msg: 'schedule put job every 30 second',
         timestamp: Date.now(),
      })
      .toPromise()
  } catch (err) {
    console.error(err)
  }
}

@Cron() decorator tells NestJS when the job starts and terminates and its frequency. The decorator supports all standard cron patterns; therefore, @Cron(‘*/15 * * * * *’) makes the Http POST request every 15 seconds. Similarly, PATCH and PUT requests are triggered every 20 and 30 seconds respectively.

Second Attempt: Convert declarative cron jobs to dynamic cron jobs

If QA team did not make the request, the scheduler was completed and we could go home and rest. The request seemed trivial; we could store the cron patterns as environment variables and applied them in @Cron(), right? Wrong answer, my friends.

// task.service.ts

const defaultInterval = '* * * * * *'    
this.postJobInterval = this.configService.get<string>('POST_JOB_INTERVAL', defaultInterval)

@Cron(this.postJobInterval)
async handlePostJob(): Promise<void> {
    ...
}

I couldn’t use this.postJobInterval inside @Cron() because “this” variable can be null. Fortunately, dynamic scheduler module API exists and we can add cron jobs programmatically with configurable cron patterns.

We define new job intervals in .env (POST_JOB_INTERVAL, PATCH_JOB_INTERVAL, PUT_JOB_INTERVAL)

.env

POST_JOB_INTERVAL='*/15 * * * * *'
PATCH_JOB_INTERVAL='*/20 * * * * *'
PUT_JOB_INTERVAL='*/30 * * * * *'

We comment out the old codes and define addCronJobs function to create dynamic cron jobs and register them in scheduler registry. Finally, we start the jobs such that they are triggered every 15, 20 and 30 seconds. Remember to inject SchedulerRegistry in the constructor.

// task.service.ts

const defaultInterval = '* * * * * *'    

private readonly postJobInterval: string
private readonly patchJobInterval: string
private readonly putJobInterval: string
private readonly baseUrl: string

constructor(
    private httpService: HttpService,
    private schedulerRegistry: SchedulerRegistry,
    private configService: ConfigService,
  ) {
    this.postJobInterval = this.configService.get<string>('POST_JOB_INTERVAL', defaultInterval)
    this.patchJobInterval = this.configService.get<string>('PATCH_JOB_INTERVAL', defaultInterval)
    this.putJobInterval = this.configService.get<string>('PUT_JOB_INTERVAL', defaultInterval)
    this.baseUrl = this.configService.get<string>('BASE_URL', '')
}

addCronJobs(): void {
    const postJob = new CronJob(this.postJobInterval, async () => {
      try {
        await this.httpService
          .post(`${this.baseUrl}/post-job`, {
            name: 'connie',
            msg: 'schedule post job every 15 second',
            timestamp: Date.now(),
          })
          .toPromise()
      } catch (err) {
        console.error(err)
      }
    })

    const patchJob = new CronJob(this.patchJobInterval, async () => {
      try {
        await this.httpService
          .patch(`${this.baseUrl}/patch-job`, {
            name: 'connie',
            msg: 'schedule patch job every 20 second',
            timestamp: Date.now(),
          })
          .toPromise()
      } catch (err) {
        console.error(err)
      }
    })

    const putJob = new CronJob(this.putJobInterval, async () => {
      try {
        await this.httpService
          .put(`${this.baseUrl}/put-job`, {
            name: 'connie',
            msg: 'schedule put job every 30 second',
            timestamp: Date.now(),
          })
          .toPromise()
      } catch (err) {
        console.error(err)
      }
    })

    this.schedulerRegistry.addCronJob('post-job', postJob)
    this.schedulerRegistry.addCronJob('patch-job', patchJob)
    this.schedulerRegistry.addCronJob('put-job', putJob)
    postJob.start()
    patchJob.start()
    putJob.start()
  }

Modify TaskModule to implement OnModuleInit interface and start cron jobs in onModuleInit function.

// task.module.ts

export class TaskModule implements OnModuleInit {  
  constructor(private taskService: TaskService) {}
  async onModuleInit() {    
    await taskService.addCronJobs()    
  }
}

Start the application and observe the messages in the terminal

npm run start:dev

The current implementation meets the requirements but it is not DRY. Why do I say that? It is because I see a common pattern in the three dynamic jobs.

  • Create a callback function that
    • makes a new HTTP request in a try-catch block
    • converts the observable to promise with toPromise()
    • await the result of the HTTP request
  • Create a new cron job with cron pattern and the callback function
  • Add the new cron job to scheduler registry
  • Start the cron job

I can generalize this pattern such that developer can write minimum codes to add new cron job in the future.

Third Attempt: Generalize the dynamic task scheduler

Define a new interface, JobConfiguration, that stores the metadata of the job

// job-configuration.interface.ts
import { Method } from 'axios'

export interface JobConfiguration {
  url: string
  interval: string
  method: Method
  dataFn: () => any
  name: string
}

method is the supported method of Axios and it can be POST, PATCH, PUT or DELETE. dataFn function is used to generate new request data in the callback.

Define metadata of cron jobs in jobConfigurations array.

this.jobConfigurations = [
      {
        url: `${this.baseUrl}/post-job`,
        interval: this.postJobInterval,
        method: 'POST',
        dataFn: () => ({
          name: 'connie',
          msg: 'schedule dynamic post job every 15 second',
          timestamp: Date.now(),
        }),
        name: 'post-job2',
      },
      {
        url: `${this.baseUrl}/patch-job`,
        interval: this.patchJobInterval,
        method: 'PATCH',
        dataFn: () => ({
          name: 'mary',
          msg: 'schedule dynamic patch job every 20 second',
          timestamp: Date.now(),
        }),
        name: 'patch-job2',
      },
      {
        url: `${this.baseUrl}/put-job`,
        interval: this.putJobInterval,
        method: 'PUT',
        dataFn: () => ({
          name: 'job',
          msg: 'schedule dynamic put job every 30 second',
          timestamp: Date.now(),
        }),
        name: 'put-job2',
      },
]

Define a high-order function (HOF), callbackGenerator, that returns a callback function. The callback function is designed to construct Http request to perform a task.

private callbackGenerator(configuration: JobConfiguration): () => Promise<void> {
    const { url, method, dataFn } = configuration
    return async () => {
      try {
        await this.httpService
          .request({
            url,
            method,
            data: dataFn(),
          })
          .toPromise()
      } catch (err) {
        console.error(err)
      }
    }
 }

Lastly, define addConfigurableCronJobs function to iterate jobConfigurations array and insert the cron jobs to scheduler registry

addConfigurableCronJobs(): void {
    for (const configuration of this.jobConfigurations) {
      const { interval, name } = configuration
      const callback = this.callbackGenerator(configuration)
      const cronjob = new CronJob(interval, callback)
      this.schedulerRegistry.addCronJob(name, cronjob)
      cronjob.start()
    }
}

In task.module.ts,  update onModuleInit function to call addConfigurableCronJobs instead and implement onModuleDestroy to stop cron jobs when application shutdown. 

// task.module.ts
export class TaskModule implements OnModuleInit {  
  constructor(private taskService: TaskService) {}
  async onModuleInit() {    
     // await taskService.addCronJobs()    
     await this.taskService.addConfigurableCronJobs()  
  }
  
  async onModuleDestroy() {    
     await this.taskService.deleteConfigurableCronJobs()  
  }
}
// task.sevice.ts
export class TaskService {  
  ...
  
  deleteConfigurableCronJobs(): void {    
     for (const configuration of this.jobConfigurations) {      
        const { name } = configuration      
        this.schedulerRegistry.deleteCronJob(name)    
     }  
   }
}

deleteConfigurableCronJobs function iterates jobConfigurations array and deletes the jobs one by one.

In main.ts,  add app.enableShutdownHooks() to listen to application shutdown event that ultimately calls onModuleDestroy to stop the cron jobs.

// main.ts
async function bootstrap() {
  ...
  
  // Starts listening for shutdown hooks  
  app.enableShutdownHooks()
  const configService = app.get(ConfigService)
  const port = configService.get<number>('PORT', 0)
  await app.listen(port)
}

Start the application and observe the messages in the terminal

npm run start:dev

Similar to task scheduler v2, cron jobs are triggered every 15, 20 and 35 seconds. We have completed the refactoring and scheduler codes are DRY.

Add New Cron Job

It is made easy after the refactoring. Developers simply add new job configuration in the jobConfigurations array and addConfigurableCronJobs takes care of callback creation, job registration and job launch automatically. Less code and less development time on the part of developers.

Final thoughts

This post shows how our company implements dynamic task scheduler using HttpModule and ScheduleModule of NestJS. We did not settle for the non-DRY version after fulfilling QA request. We identified common pattern and generalized the architecture with high-order function and metadata array. The outcome is elimination of duplicated codes and easy code changes to support new cron job.

I hope you like this article and share it to people who are interested in NestJS!

Resources:

  1. Repo: https://github.com/railsstudent/nest-dynamic-scheduler
  2. Nest Task Scheduling: https://docs.nestjs.com/techniq\ues/task-scheduling
  3. Nest Http Module: https://docs.nestjs.com/techniques/http-module