See RxJS counterparts of array methods in action

Reading Time: 4 minutes

 60 total views

Introduction

This is day 7 of Wes Bos’s JavaScript 30 challenge where I am going to apply some RxJS counterparts of array methods to streams to obtain results.

In this blog post, I create observables from arrays and use some, every, filter, find and findIndex operators on RxJS streams to get some answers to some questions.

let's go

Create a new Angular project in workspace

ng generate application day7-array-cardio-part2

Create RxJS custom operator to do Array.some

RxJS counterparts of array methods include every, filter, find and findIndex, and some is not one of them. Therefore, I create a custom-operators directory to define a custom operator for Array.some.

The implementation of the custom operator can be found in some.operator.ts

// some.operator.ts

import { Observable, find, map, defaultIfEmpty } from 'rxjs';

export function some<T>(predicate: (item: T) => boolean) {
    return function (source: Observable<T>) {
        return source.pipe(
            find(item => predicate(item)),
            map(c => !!c),
            defaultIfEmpty(false),
        )
    }
}
  • the operator accepts a predicate that returns a boolean result
  • the source stream uses RxJS find to look for the first item that satisfies the predicate
  • emits the first item and use map to coerce object to boolean value
  • when source stream does not emit any item, defaultIfEmpty defaults the result to false

Define RxJS streams and emit them to arrays

src/app
├── app.component.spec.ts
├── app.component.ts
├── app.module.ts
├── custom-operators
│   └── some.operator.ts
└── interfaces
    ├── comment.interface.ts
    └── person.interface.ts

Let’s construct some streams and then use RxJS counterparts of array methods. Persons is an array of object and I convert it into an observable using from operator.

// app.component.ts

persons = [
    { name: 'Wes', year: 1988 },
    { name: 'Mary', year: 2006 },
    { name: 'George', year: 2009 },
    { name: 'Kait', year: 1986 },
    { name: 'Irv', year: 1970 },
    { name: 'Lux', year: 2015 }
];

people$ = from(this.persons).pipe(
    map((person) => this.calculateAge(person)),
    shareReplay(this.persons.length),
);

private calculateAge(person: PersonNoAge): Person {
   return {
      ...person,
      age: new Date().getFullYear() - person.year
   };
}

shareReplay(this.persons.length) caches all the persons before other streams reuse people$ to test conditions and do filtering. CalculateAge calculates the age of each person to help me verify the results.

Define peopleArray$ to get a persons array when people$ completes

peopleArray$ = this.people$.pipe(toArray());

Next, I construct a similar comments stream that is going to illustrate find and findIndex examples.

// app.component.ts

comments = [
    { text: 'Love this!', id: 523423 },
    { text: 'Super good', id: 823423 },
    { text: 'You are the best', id: 2039842 },
    { text: 'Ramen is my fav food ever', id: 123523 },
    { text: 'Nice Nice Nice!', id: 542328 }
] 

comments$ = from(this.comments).pipe(shareReplay(this.comments.length));
commentsArray$ = this.comments$.pipe(toArray());

Render peopleArray$ and commentsArray$ to visualize the data sets

@Component({
  selector: 'app-root',
  template: `
    <div class="container">
      <section class="people">
        <h1>People</h1>
        <ul *ngIf="peopleArray$ | async as peopleArray">
          <li *ngFor="let p of peopleArray">Name: {{ p.name }}<br/> Year: {{ p.year }}<br/> Age: {{ p.age }}</li>
        </ul>
      </section>
      <section class="comments">
        <h1>Comments</h1>
        <ul *ngIf="commentsArray$ | async as commentsArray">
          <li *ngFor="let p of commentsArray">Id: {{ p.id }}<br/> Text: {{ p.text }}</li>
        </ul>
      </section>
    </div>
  `,
  styles: [`...omitted for brevity...`]
})
export class AppComponent { ...RxJS codes... }

Example 1: Is at least one person an adult (19 or older)?

The question can answer by custom operator, some.

isAdult$ = this.people$.pipe(some(person => this.isAnAdult(person)));

private isAnAdult(person: Person, age = 19): boolean {
   return person.age >= age;
}

The stream finds the first person that is 19 or older and returns a boolean value.

Use async pipe to resolve isAdult$ and display the value

<p>Is Adult (at least one person is 19 or older)? {{ isAdult$ | async }}</p>

Example 2: Is everyone an adult (19 or older)?

allAdults$ = this.people$.pipe(every(person => this.isAnAdult(person)));

The stream validates every person is 19 or older.

Use async pipe to resolve allAdults$ and display the value

<p>All Adults (everyone is 19 or older)? {{ allAdults$ | async }}</p>

Example 3: Who are 19 years old or older?

adults$ = this.people$.pipe(
    filter(person => this.isAnAdult(person)),
    toArray()
); 

<section class="people">
   <h1>Adults</h1>
   <ul *ngIf="adults$ | async as adults">
       <li *ngFor="let p of adults">Name: {{ p.name }}<br/> Year: {{ p.year }}<br/> Age: {{ p.age }}</li>
   </ul>
 </section>

adults$ stream filters persons who are 19 and older. When stream completes, I use toArray to emit a person array in order to render the array elements in inline template.

Example 4: Find comment where id = 823423

comment$ = this.comments$.pipe(find(c => c.id === 823423));

The type of comment$ is Observable<Comment | undefined> because the stream may or may have the id.

<ng-container *ngIf="comment$ | async as comment; else noComment">
     <p>Find comment 823423?</p>
     <p>Id: {{ comment.id }}, text: {{ comment.text }}</p>
</ng-container>

<ng-template #noComment>
   <p>Comment does not exist</p>
</ng-template>

When comment exists, the template displays comment id and comment id. Otherwise, the template displays “Comment does not exist”.

Example 5: Find index of comment where id = 823423

Instead of getting back the comment, I am interested in its index. It is feasible by replacing find with findIndex.

commentIndex$ = this.comments$.pipe(findIndex(c => c.id === 823423));

<p>FindIndex of comment 823423? {{ commentIndex$ | async }}</p>

Final Thoughts

In this post, I show that some array methods have RxJS counterparts and they are used in a similar fashion except the source is an observable. When RxJS counterparts are absent, for example some and sort, I build my own custom operators that work for observable. Ultimately, the results are the same as if the input is an array.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in Angular and other technologies.

Resources:

  1. Repo: https://github.com/railsstudent/ng-rxjs-30/tree/main/projects/day7-array-cardio-part2
  2. Live demo: https://railsstudent.github.io/ng-rxjs-30/day7-array-cardio-part2/
  3. Wes Bos’s JavaScript 30 Challenge: https://github.com/wesbos/JavaScript30

Create custom operators to sum and sort RxJS streams

Reading Time: 5 minutes

 79 total views,  1 views today

Introduction

This is day 4 of Wes Bos’s JavaScript 30 challenge where I am going to create RxJS custom decorators to sum and sort streams.

In this blog post, I describe the steps to create RxJS custom decorators, sum and sort to manipulate streams. First, I apply sort() to sort RxJS streams by a comparison function and render the sorted results on browser. Then, I apply sum() to accumulate stream by a property and output the total on browser. When RxJS does not provide operator for a task, we have the ability to build our own and use them to process RxJS streams.

let's go

Create a new Angular project in workspace

ng generate application day4-array-cardio-part1

Create RxJS custom operator to sum stream

I create custom-operators directory and add a new file, sum.operator.ts

// sum.operator.ts

import { reduce } from 'rxjs';

export function sum<T, A extends number>(sumFn: (acc: A, t: T) => A, initial: A) {
    return reduce(sumFn, initial);
}
  • the operator accepts an initial value (initial parameter) and accumulator (sumFn)
  • call RxJS’s reduce to calculate the sum of a stream
  • emit total when stream completes.

Create RxJS custom operator to sort stream

Similarly, I add sort.operator.ts to custom-operators directory to sort stream when it completes

import { Observable, map, toArray } from 'rxjs';

export function sort<T>(sortFn: (x: T, y: T) => number) {
    return function(source: Observable<T>) {
        return source.pipe(
            toArray(),
            map(items => items.sort(sortFn))
        )
    }
}
  • the operator accepts a comparison function (sortFn) that compares two objects and return an integer to define order
  • when source stream completes, I call toArray() to emit an T[]
  • execute map() to pass sortFn to Array.sort() and emit the sorted array

Get RxJS stream in an array

src/app
├── app.component.spec.ts
├── app.component.ts
├── app.module.ts
└── custom-operators
    ├── sort.operator.ts
    └── sum.operator.ts

Inventors is an array of object I convert the array into a stream using from operator

inventors = [
    { first: 'Albert', last: 'Einstein', year: 1879, passed: 1955 },
    { first: 'Isaac', last: 'Newton', year: 1643, passed: 1727 },
    { first: 'Galileo', last: 'Galilei', year: 1564, passed: 1642 },
    { first: 'Marie', last: 'Curie', year: 1867, passed: 1934 },
    { first: 'Johannes', last: 'Kepler', year: 1571, passed: 1630 },
    { first: 'Nicolaus', last: 'Copernicus', year: 1473, passed: 1543 },
    { first: 'Max', last: 'Planck', year: 1858, passed: 1947 },
    { first: 'Katherine', last: 'Blodgett', year: 1898, passed: 1979 },
    { first: 'Ada', last: 'Lovelace', year: 1815, passed: 1852 },
    { first: 'Sarah E.', last: 'Goode', year: 1855, passed: 1905 },
    { first: 'Lise', last: 'Meitner', year: 1878, passed: 1968 },
    { first: 'Hanna', last: 'Hammarström', year: 1829, passed: 1909 }
  ];

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));

shareReplay(this.inventors.length) caches all the inventors before other streams reuse inventors$ to sum or sort RxJS stream.

Define inventoryArray$ to get an inventory when inventors$ completes

inventorArray$ = this.inventors$.pipe(toArray());

Define a ng template to output inventory array as an unordered list. Subsequently, I reuse this template to output other array examples.

// app.component.ts

@Component({
  selector: 'app-root',
  template: `
    <div class="container">
      <ng-container 
        *ngTemplateOutlet="inventors; context: { $implicit: 'Inventors', list: inventorArray$ | async }">
      </ng-container>
    </div>

    <ng-template #inventors let-title let-list="list">
      <section class="inventors">
        <h2>{{ title }}</h2>
        <ul>
          <li *ngFor="let inventory of list; trackby: inventoryTrackBy">
            Name: {{ inventory.first }} {{ inventory.last }}<br />
            {{ inventory.year }} - {{ inventory.passed }}, Age: {{ inventory.passed - inventory.year }}
          </li>
        </ul>
      </section>
    </ng-template>
  `,
  styles: [`
    :host {
      display: block;
    }
    ... omitted for brevity ...
  `]
})
export class AppComponent { ...RxJS codes ... }

Sort inventor stream by year

ordered$ = this.inventors$.pipe(sort((a, b) => a.year > b.year ? 1 : -1));

I simply pass a comparison function to the custom sort operator to compare the year property of two inventory element. It is possible because toArray returns an JavaScript array that has a sort function to perform sorting.

Similarly, the template is available to render ordered$ after it is resolved

<ng-container 
     *ngTemplateOutlet="inventors; context: { $implicit: 'Ordered Inventors', list: ordered$ | async }">
</ng-container>

Sort inventor stream by age from oldest to youngest

oldest$ = this.inventors$.pipe(sort((a, b) => { 
    const lastInventor = a.passed - a.year;
    const nextInventor = b.passed - b.year;
    return lastInventor > nextInventor ? -1 : 1;
  }));

I pass a different comparison function to the operator to compare the age of the inventors. When the next inventor has a greater age (passed – year) than the previous inventor, the comparison function swaps the position to obtain the correct ordering.

<ng-container 
    *ngTemplateOutlet="inventors; context: { $implicit: 'Oldest Inventors', list: oldest$ | async }">
</ng-container>

The template displays the oldest$ stream to list the inventors by their age

Demonstrate sort operator can work on any data type

The custom sort operator can work on any data type because of a powerful TypeScript concept called generic. It can sort any element of type T as long as the comparison function returns an integer to define order.

The following is an example to sort a people stream by last name.

people = ['Bernhard, Sandra', 'Bethea, Erin', 'Becker, Carl', 'Bentsen, Lloyd', 'Beckett, Samuel', 'Blake, William', 'Berger, Ric', 'Beddoes, Mick', 'Beethoven, Ludwig','Belloc, Hilaire', 'Begin, Menachem', 'Bellow, Saul', 'Benchley, Robert', 'Blair, Robert', 'Benenson, Peter', 'Benjamin, Walter', 'Berlin, Irving','Benn, Tony', 'Benson, Leana', 'Bent, Silas', 'Berle, Milton', 'Berry, Halle', 'Biko, Steve', 'Beck, Glenn', 'Bergman, Ingmar', 'Black, Elk', 'Berio, Luciano','Berne, Eric', 'Berra, Yogi', 'Berry, Wendell', 'Bevan, Aneurin', 'Ben-Gurion, David', 'Bevel, Ken', 'Biden, Joseph', 'Bennington, Chester', 'Bierce, Ambrose', 'Billings, Josh', 'Birrell, Augustine', 'Blair, Tony', 'Beecher, Henry', 'Biondo, Frank'];

people$ = from(this.people).pipe(shareReplay(this.people.length));
alpha$ = this.people$.pipe(sort((lastOne, nextOne) => {
   const [aLast] = lastOne.split(', ');
   const [bLast] = nextOne.split(', ');
   return aLast > bLast ? 1 : -1;
}));

The data type of people in string whereas the inventors in the inventors stream are objects. Yet, sort sorts the streams in the correct order in all the examples.

Demonstrate sum operator to sum RxJS stream

The example is to add the year of all the inventors and display the total year. The sum operator calls reduce under the hood; therefore, it expects a reducer function and an initial value.

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));
totalYears$ = this.inventors$.pipe(sum((acc: number, y) => acc + (y.passed - y.year), 0));

<section class="inventors">
    <h2>Total Years</h2>
    <p>{{ totalYears$ | async }}</p>
</section>

The custom operator is very simple and it can take other functions to do things such as count the number of characters in first or last.

totalFirstLength$ = this.inventors$.pipe(sum((acc: number, y) => acc + y.first.length, 0));

Another one-liner that states the purpose of the stream clearly.

Final Thoughts

In this post, I show how to create custom RxJS operators and use them to transform streams in an Angular application. To emphasize DRY principle, I create a ng template to render the streams in a list to reduce duplicated codes in the inline template.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in Angular and other technologies.

Resources:

  1. Repo: https://github.com/railsstudent/ng-rxjs-30/tree/main/projects/day4-array-cardio-part1
  2. Live demo: https://railsstudent.github.io/ng-rxjs-30/day4-array-cardio-part1/
  3. Wes Bos’s JavaScript 30 Challenge: https://github.com/wesbos/JavaScript30