Compose RxJS custom operators to hide complex logic

Reading Time: 4 minutes

Loading

Introduction

This post describes how to compose RxJS custom operators from existing operators to hide complex logic. Not only RxJS custom operator encapsulates logic but it also promotes reusable operator in RxJS stream. In the Stackblitz example, I refactored a custom operator to emit a number between minimum value and maximum value. Then, the number is passed to Pokemon API to retrieve a Pokemon.

let's go

Pokemon Controls component without custom operators

// pokemon-controls.component.ts

@Component({
  selector: 'app-pokemon-controls',
  standalone: true,
  imports: [FormsModule, NgFor],
  template: `...omitted due to brevity...`,
  styles: [`...omitted due to brevity...`],
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class PokemonControlsComponent implements OnDestroy, AfterViewInit {
  @ViewChildren(PokemonButtonDirective)
  btns: QueryList<PokemonButtonDirective>;

  @ViewChild('f', { static: true, read: NgForm })
  myForm!: NgForm;

  searchId = 1;
  pokemonService = inject(PokemonService);
  subscription!: Subscription;

  ngAfterViewInit(): void {
    const btns$ = this.btns.map((btn) => btn.click$);
    const inputId$ = this.myForm.form.valueChanges
      .pipe(
        debounceTime(300),
        distinctUntilChanged((prev, curr) => prev.searchId === curr.searchId),
        filter((form) => form.searchId >= 1 && form.searchId <= 100),
        map((form) => form.searchId),
        map((value) => ({
          value,
          action: POKEMON_ACTION.OVERWRITE,
        }))
      );

    this.subscription = merge(...btns$, inputId$)
      .pipe(
        scan((acc, { value, action }) => { 
          if (action === POKEMON_ACTION.OVERWRITE) {
            return value;
          } else if (action === POKEMON_ACTION.ADD) {
            const potentialValue = acc + value;
            if (potentialValue >= 1 && potentialValue <= 100) {
              return potentialValue;
            } else if (potentialValue < 1) {
              return 1;
            }

            return 100;
          }

          return acc;
        }, 1),
        startWith(1),
        shareReplay(1),
      )
      .subscribe((pokemonId) => this.pokemonService.updatePokemonId(pokemonId));
  }

  ngOnDestroy(): void {
    this.subscription.unsubscribe();
  }
}

In PokemonControlsComponent standalone component, inputId$ and this.subscription are results of existing RxJS operators. inputId$ emits the inputted Pokemon id and I can create an operator that emits an integer between the lower bound and upper bound inclusively.

On the other hand, merge operator merges the Observables, uses existing operators to derive the Pokemon Id, manage and cache the Pokemon state. Similarly, all these steps can encapsulate in a custom operator to emit the cached Pokemon id.

Compose RxJS custom operators to emit input field

// emit-pokemon-id.operator.ts

import { Observable, debounceTime, distinctUntilChanged, filter, map } from "rxjs"
import { POKEMON_ACTION } from "../enums/pokemon.enum"

export const emitPokemonId = (minPokemonId = 1, maxPokemonId = 100) =>
  (source: Observable<any>) => source.pipe(
    debounceTime(300),
    distinctUntilChanged((prev, curr) => prev.searchId === curr.searchId),
    filter((form) => form.searchId >= minPokemonId && form.searchId <= maxPokemonId),
    map((form) => Math.floor(form.searchId)),
    map((value) => ({
      value,
      action: POKEMON_ACTION.OVERWRITE,
    }))
  );

emitPokemonId is a custom RxJS operator that emits the value of the input field. source is an Observable that emits { searchId: number } Object and the searchId will be validated to ensure it is between the lower bound and the upper bound inclusively.

  • debounceTime(300) – wait 300 milliseconds to ensure no ensuing input
  • distinctUntilChanged((prev, curr) => prev.searchId === curr.searchId) – compare the current search value is different than the previous one
  • filter((form) => form.searchId >= minPokemonId && form.searchId <= maxPokemonId) – validate the search value is between the minimum and maximum values
  • map((form) => Math.floor(form.searchId)) – Truncate the search value to emit an integer
  • map((value) => ({ value, action: POKEMON_ACTION.OVERWRITE })) – emit the search value and the next action

I have completed the first custom operator and will use it to replace the RxJS logic in PokemonControlsComponent.

// pokemon-controls.component.ts

import { emitPokemonId } from '../custom-operators/emit-pokemon-id.operator';

ngAfterViewInit(): void {
    const btns$ = this.btns.map((btn) => btn.click$); 
    const inputId$ = this.myForm.form.valueChanges.pipe(emitPokemonId());
}

Compose RxJS custom operators to cache Pokemon id

// derive-pokemon-id.operator.ts

import { Observable, scan, startWith, shareReplay } from "rxjs";
import { POKEMON_ACTION } from "../enums/pokemon.enum";

export const derivePokemonId = (minPokemonId = 1, maxPokemonId = 100) => 
  (source: Observable<{ value: number, action: POKEMON_ACTION }>) => 
    source.pipe(
      scan((acc, { value, action }) => { 
        if (action === POKEMON_ACTION.OVERWRITE) {
          return value;
        } else if (action === POKEMON_ACTION.ADD) {
          const potentialValue = acc + value;
          if (potentialValue >= minPokemonId && potentialValue <= maxPokemonId) {
            return potentialValue;
          } else if (potentialValue < minPokemonId) {
            return minPokemonId;
          }

          return maxPokemonId;
        }

        return acc;
      }, minPokemonId),
      startWith(minPokemonId),
      shareReplay(1),
  );

derivePokemonId is a custom RxJS operator that derives the current Pokemon id and caches the result. Subsequently, the Pokemon id becomes a path parameter of the Pokemon endpoint to retrieve the current Pokemon. source is an Observable that emits { value: number, action: POKEMON_ACTION } Object to scan to manage the state of Pokemon Id.

  • scan(…) – derive the next Pokemon id. When the next value is out of range, the value is adjusted to either the minimum value or the maximum value.
  • startWith(minPokemonId) – the initial value of Pokemon Id is the minimum pokemon id
  • shareReplay(1) – cache the latest Pokemon Id

I have completed the second custom operator and will use it to replace the RxJS logic in PokemonControlsComponent.

// pokemon-controls.component.ts

import { derivePokemonId } from '../custom-operators/derive-pokemon-id.operator';

ngAfterViewInit(): void {
    const btns$ = this.btns.map((btn) => btn.click$); 
    const inputId$ = this.myForm.form.valueChanges.pipe(emitPokemonId());

    this.subscription = merge(...btns$, inputId$)
      .pipe(derivePokemonId())
      .subscribe((pokemonId) => this.pokemonService.updatePokemonId(pokemonId));
}

After refactoring, the Observables in ngAfterViewInit are easier to read, understand and maintain. The descriptive name of custom operators also describes the objective that existing operators are combined to accomplish.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in Angular and other technologies.

Resources:

  1. Github Repo: https://github.com/railsstudent/ng-pokemon/tree/main/projects/pokemon-demo-7
  2. Stackblitz: https://stackblitz.com/edit/angular-nkwptv?file=src/pokemon/pokemon-controls/pokemon-controls.component.ts
  3. Derive pokemon id custom operators: https://stackblitz.com/edit/angular-nkwptv?file=src/pokemon/custom-operators/derive-pokemon-id.operator.ts
  4. Emit pokemon id custom operators: https://stackblitz.com/edit/angular-nkwptv?file=src/pokemon/custom-operators/emit-pokemon-id.operator.ts
  5. PokeAPI: https://pokeapi.co/