Create custom operators to sum and sort RxJS streams

Reading Time: 5 minutes



This is day 4 of Wes Bos’s JavaScript 30 challenge where I am going to create RxJS custom decorators to sum and sort streams.

In this blog post, I describe the steps to create RxJS custom decorators, sum and sort to manipulate streams. First, I apply sort() to sort RxJS streams by a comparison function and render the sorted results on browser. Then, I apply sum() to accumulate stream by a property and output the total on browser. When RxJS does not provide operator for a task, we have the ability to build our own and use them to process RxJS streams.

let's go

Create a new Angular project in workspace

ng generate application day4-array-cardio-part1

Create RxJS custom operator to sum stream

I create custom-operators directory and add a new file, sum.operator.ts

// sum.operator.ts

import { reduce } from 'rxjs';

export function sum<T, A extends number>(sumFn: (acc: A, t: T) => A, initial: A) {
    return reduce(sumFn, initial);
  • the operator accepts an initial value (initial parameter) and accumulator (sumFn)
  • call RxJS’s reduce to calculate the sum of a stream
  • emit total when stream completes.

Create RxJS custom operator to sort stream

Similarly, I add sort.operator.ts to custom-operators directory to sort stream when it completes

import { Observable, map, toArray } from 'rxjs';

export function sort<T>(sortFn: (x: T, y: T) => number) {
    return function(source: Observable<T>) {
        return source.pipe(
            map(items => items.sort(sortFn))
  • the operator accepts a comparison function (sortFn) that compares two objects and return an integer to define order
  • when source stream completes, I call toArray() to emit an T[]
  • execute map() to pass sortFn to Array.sort() and emit the sorted array

Get RxJS stream in an array

├── app.component.spec.ts
├── app.component.ts
├── app.module.ts
└── custom-operators
    ├── sort.operator.ts
    └── sum.operator.ts

Inventors is an array of object I convert the array into a stream using from operator

inventors = [
    { first: 'Albert', last: 'Einstein', year: 1879, passed: 1955 },
    { first: 'Isaac', last: 'Newton', year: 1643, passed: 1727 },
    { first: 'Galileo', last: 'Galilei', year: 1564, passed: 1642 },
    { first: 'Marie', last: 'Curie', year: 1867, passed: 1934 },
    { first: 'Johannes', last: 'Kepler', year: 1571, passed: 1630 },
    { first: 'Nicolaus', last: 'Copernicus', year: 1473, passed: 1543 },
    { first: 'Max', last: 'Planck', year: 1858, passed: 1947 },
    { first: 'Katherine', last: 'Blodgett', year: 1898, passed: 1979 },
    { first: 'Ada', last: 'Lovelace', year: 1815, passed: 1852 },
    { first: 'Sarah E.', last: 'Goode', year: 1855, passed: 1905 },
    { first: 'Lise', last: 'Meitner', year: 1878, passed: 1968 },
    { first: 'Hanna', last: 'Hammarström', year: 1829, passed: 1909 }

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));

shareReplay(this.inventors.length) caches all the inventors before other streams reuse inventors$ to sum or sort RxJS stream.

Define inventoryArray$ to get an inventory when inventors$ completes

inventorArray$ = this.inventors$.pipe(toArray());

Define a ng template to output inventory array as an unordered list. Subsequently, I reuse this template to output other array examples.

// app.component.ts

  selector: 'app-root',
  template: `
    <div class="container">
        *ngTemplateOutlet="inventors; context: { $implicit: 'Inventors', list: inventorArray$ | async }">

    <ng-template #inventors let-title let-list="list">
      <section class="inventors">
        <h2>{{ title }}</h2>
          <li *ngFor="let inventory of list; trackby: inventoryTrackBy">
            Name: {{ inventory.first }} {{ inventory.last }}<br />
            {{ inventory.year }} - {{ inventory.passed }}, Age: {{ inventory.passed - inventory.year }}
  styles: [`
    :host {
      display: block;
    ... omitted for brevity ...
export class AppComponent { ...RxJS codes ... }

Sort inventor stream by year

ordered$ = this.inventors$.pipe(sort((a, b) => a.year > b.year ? 1 : -1));

I simply pass a comparison function to the custom sort operator to compare the year property of two inventory element. It is possible because toArray returns an JavaScript array that has a sort function to perform sorting.

Similarly, the template is available to render ordered$ after it is resolved

     *ngTemplateOutlet="inventors; context: { $implicit: 'Ordered Inventors', list: ordered$ | async }">

Sort inventor stream by age from oldest to youngest

oldest$ = this.inventors$.pipe(sort((a, b) => { 
    const lastInventor = a.passed - a.year;
    const nextInventor = b.passed - b.year;
    return lastInventor > nextInventor ? -1 : 1;

I pass a different comparison function to the operator to compare the age of the inventors. When the next inventor has a greater age (passed – year) than the previous inventor, the comparison function swaps the position to obtain the correct ordering.

    *ngTemplateOutlet="inventors; context: { $implicit: 'Oldest Inventors', list: oldest$ | async }">

The template displays the oldest$ stream to list the inventors by their age

Demonstrate sort operator can work on any data type

The custom sort operator can work on any data type because of a powerful TypeScript concept called generic. It can sort any element of type T as long as the comparison function returns an integer to define order.

The following is an example to sort a people stream by last name.

people = ['Bernhard, Sandra', 'Bethea, Erin', 'Becker, Carl', 'Bentsen, Lloyd', 'Beckett, Samuel', 'Blake, William', 'Berger, Ric', 'Beddoes, Mick', 'Beethoven, Ludwig','Belloc, Hilaire', 'Begin, Menachem', 'Bellow, Saul', 'Benchley, Robert', 'Blair, Robert', 'Benenson, Peter', 'Benjamin, Walter', 'Berlin, Irving','Benn, Tony', 'Benson, Leana', 'Bent, Silas', 'Berle, Milton', 'Berry, Halle', 'Biko, Steve', 'Beck, Glenn', 'Bergman, Ingmar', 'Black, Elk', 'Berio, Luciano','Berne, Eric', 'Berra, Yogi', 'Berry, Wendell', 'Bevan, Aneurin', 'Ben-Gurion, David', 'Bevel, Ken', 'Biden, Joseph', 'Bennington, Chester', 'Bierce, Ambrose', 'Billings, Josh', 'Birrell, Augustine', 'Blair, Tony', 'Beecher, Henry', 'Biondo, Frank'];

people$ = from(this.people).pipe(shareReplay(this.people.length));
alpha$ = this.people$.pipe(sort((lastOne, nextOne) => {
   const [aLast] = lastOne.split(', ');
   const [bLast] = nextOne.split(', ');
   return aLast > bLast ? 1 : -1;

The data type of people in string whereas the inventors in the inventors stream are objects. Yet, sort sorts the streams in the correct order in all the examples.

Demonstrate sum operator to sum RxJS stream

The example is to add the year of all the inventors and display the total year. The sum operator calls reduce under the hood; therefore, it expects a reducer function and an initial value.

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));
totalYears$ = this.inventors$.pipe(sum((acc: number, y) => acc + (y.passed - y.year), 0));

<section class="inventors">
    <h2>Total Years</h2>
    <p>{{ totalYears$ | async }}</p>

The custom operator is very simple and it can take other functions to do things such as count the number of characters in first or last.

totalFirstLength$ = this.inventors$.pipe(sum((acc: number, y) => acc + y.first.length, 0));

Another one-liner that states the purpose of the stream clearly.

Final Thoughts

In this post, I show how to create custom RxJS operators and use them to transform streams in an Angular application. To emphasize DRY principle, I create a ng template to render the streams in a list to reduce duplicated codes in the inline template.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in Angular and other technologies.


  1. Repo:
  2. Live demo:
  3. Wes Bos’s JavaScript 30 Challenge: