Post contents
Welcome back Guys,
Today we'll start with the Filtering Operators. As you can imagine these operators are used to filter our sources. Well, let's get started.
Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.
import { Observable } from "rxjs";import { auditTime } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 10) { subscriber.next(++count); } else { clearInterval(id); subscriber.complete(); } }, 1000);});const result = input$.pipe(auditTime(2000));result.subscribe({ next: x => console.log( `${new Date().toLocaleTimeString()} - [auditTime result]: ${x}` ),});11:15:25 - [auditTime result]: 211:15:27 - [auditTime result]: 411:15:29 - [auditTime result]: 611:15:31 - [auditTime result]: 811:15:33 - [auditTime result]: 10
Emits a notification from the source Observable only after a particular time span has passed without another source emission.
import { Observable } from "rxjs";import { debounceTime, tap } from "rxjs/operators";const timesInSecond = [1, 0.5, 3, 1.5, 3, 1];const input$ = new Observable<number>(subscriber => { let count = 0; (function next() { const seconds = timesInSecond[count++]; setTimeout(() => { subscriber.next(seconds); if (count > 5) { subscriber.complete(); return; } next(); }, seconds * 1000); })();});const result = input$.pipe( tap(x => console.log( `${new Date().toLocaleTimeString()} - [before debounceTime]: ${x}` ) ), debounceTime(2000));result.subscribe({ next: x => console.log( `${new Date().toLocaleTimeString()} - [debounceTime result]: ${x}` ),});09:44:29 - [before debounceTime]: 109:44:29 - [before debounceTime]: 0.509:44:31 - [debounceTime result]: 0.509:44:32 - [before debounceTime]: 309:44:34 - [before debounceTime]: 1.509:44:36 - [debounceTime result]: 1.509:44:37 - [before debounceTime]: 309:44:38 - [before debounceTime]: 109:44:38 - [debounceTime result]: 1
Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.
import { Observable } from "rxjs";import { tap, throttleTime } from "rxjs/operators";const timesInSecond = [1, 0.5, 3, 1.5, 3, 1];const input$ = new Observable<number>(subscriber => { let count = 0; (function next() { const seconds = timesInSecond[count++]; setTimeout(() => { subscriber.next(seconds); if (count > 5) { subscriber.complete(); return; } next(); }, seconds * 1000); })();});const result = input$.pipe( tap(x => console.log( `${new Date().toLocaleTimeString()} - [before throttleTime]: ${x}` ) ), throttleTime(2000));result.subscribe({ next: x => console.log( `${new Date().toLocaleTimeString()} - [throttleTime result]: ${x}` ),});10:41:46 - [before throttleTime]: 110:41:46 - [throttleTime result]: 110:41:46 - [before throttleTime]: 0.510:41:49 - [before throttleTime]: 310:41:49 - [throttleTime result]: 310:41:51 - [before throttleTime]: 1.510:41:54 - [before throttleTime]: 310:41:54 - [throttleTime result]: 310:41:55 - [before throttleTime]: 1
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
import { Observable } from "rxjs";import { distinct } from "rxjs/operators";const input$ = new Observable<number | string>(subscriber => { let count = 0; const array = [1, 1, "1", 2, 2, 3, 3]; const id = setInterval(() => { if (count < array.length) { subscriber.next(array[count++]); } else { clearInterval(id); subscriber.complete(); } }, 1000);});input$.pipe(distinct()).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [distinct]: ${x}`),});12:22:58 - [distinct]: 112:23:00 - [distinct]: 112:23:01 - [distinct]: 212:23:03 - [distinct]: 3
distinct(p => p.id).
import { Observable } from "rxjs";import { distinctUntilChanged } from "rxjs/operators";type State = { id: number; value: string };const createValue = (id: number): State => ({ id, value: String(id) });const array = [ createValue(1), createValue(1), createValue(1), createValue(2), createValue(2), createValue(2), createValue(3), createValue(3), createValue(3), createValue(4), createValue(4), createValue(4),];const input$ = new Observable<State>(subscriber => { let count = 0; const id = setInterval(() => { if (count < array.length) { subscriber.next(array[count++]); } else { clearInterval(id); subscriber.complete(); } }, 1000);});input$ .pipe(distinctUntilChanged((prev, curr) => prev.id === curr.id)) .subscribe({ next: x => console.log( `${new Date().toLocaleTimeString()} - [distinctUntilChanged]`, x ), });11:45:39 - [distinctUntilChanged] { id: 1, value: '1' }11:45:42 - [distinctUntilChanged] { id: 2, value: '2' }11:45:45 - [distinctUntilChanged] { id: 3, value: '3' }11:45:48 - [distinctUntilChanged] { id: 4, value: '4' }
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, using a property accessed by using the key provided to check if the two items are distinct.
import { Observable } from "rxjs";import { distinctUntilKeyChanged } from "rxjs/operators";type State = { id: number; value: string };const createValue = (id: number): State => ({ id, value: String(id) });const array = [ createValue(1), createValue(1), createValue(1), createValue(2), createValue(2), createValue(2), createValue(3), createValue(3), createValue(3), createValue(4), createValue(4), createValue(4),];const input$ = new Observable<State>(subscriber => { let count = 0; const id = setInterval(() => { if (count < array.length) { subscriber.next(array[count++]); } else { clearInterval(id); subscriber.complete(); } }, 1000);});input$.pipe(distinctUntilKeyChanged("id")).subscribe({ next: x => console.log( `${new Date().toLocaleTimeString()} - [distinctUntilKeyChanged]`, x ),});11:46:12 - [distinctUntilKeyChanged] { id: 1, value: '1' }11:46:15 - [distinctUntilKeyChanged] { id: 2, value: '2' }11:46:18 - [distinctUntilKeyChanged] { id: 3, value: '3' }11:46:21 - [distinctUntilKeyChanged] { id: 4, value: '4' }
Filter items emitted by the source Observable by only emitting those that satisfy a specified predicate.
import { Observable } from "rxjs";import { filter } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 10) { subscriber.next(++count); } else { clearInterval(id); subscriber.complete(); } }, 1000);});input$.pipe(filter(x => x % 2 === 0)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [filter]: ${x}`),});11:46:43 - [filter]: 211:46:45 - [filter]: 411:46:47 - [filter]: 611:46:49 - [filter]: 811:46:51 - [filter]: 10
Emits only the first value (or the first value that meets some condition) emitted by the source Observable.
import { Observable } from "rxjs";import { first } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 10) { subscriber.next(++count); } else { clearInterval(id); subscriber.complete(); } }, 1000);});console.log(`${new Date().toLocaleTimeString()} - [first] start`)input$.pipe(first()).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [first]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [first] complete`),});09:47:15 - [first] start09:47:16 - [first]: 109:47:16 - [first] complete
Returns an Observable that emits only the last item emitted by the source Observable. It optionally takes a predicate function as a parameter, in which case, rather than emitting the last item from the source Observable, the resulting Observable will emit the last item from the source Observable that satisfies the predicate.
import { Observable } from "rxjs";import { last } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 10) { subscriber.next(++count); } else { clearInterval(id); subscriber.complete(); } }, 1000);});console.log(`${new Date().toLocaleTimeString()} - [last] start`)input$.pipe(last()).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [last]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [last] complete`),});09:48:14 - [last] start09:48:25 - [last]: 1009:48:25 - [last] complete
Returns an Observable that skips the first count items emitted by the source Observable.
import { Observable } from "rxjs";import { skip } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if(count < 5) subscriber.next(count++); else { clearInterval(id); subscriber.complete(); } }, 1000); return () => { clearInterval(id); subscriber.complete(); };});input$.pipe(skip(2)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [skip]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [skip]: complete`),});10:33:41 - [skip]: 210:33:42 - [skip]: 310:33:43 - [skip]: 410:33:44 - [skip]: complete
skip(1) the operator skips the first value, if we write skip(2) the operator skips the first two values and so on.
Skip a specified number of values before the completion of an observable.
import { Observable } from "rxjs";import { skipLast } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 5) subscriber.next(count++); else { clearInterval(id); subscriber.complete(); } }, 1000);});console.log(`${new Date().toLocaleTimeString()} - [skipLast]: start`)input$.pipe(skipLast(3)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [skipLast]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [skipLast]: complete`),});10:33:58 - [skipLast]: start10:34:02 - [skipLast]: 010:34:03 - [skipLast]: 110:34:04 - [skipLast]: complete
skipLast(1) the operator skips the last value, if we write skipLast(2) the operator skips the last two values and so on.
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
import { Observable } from "rxjs";import { skipUntil } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 5) subscriber.next(count++) else { clearInterval(id); subscriber.complete(); } }, 1000);});const untilInput$ = new Observable<void>(subscriber => { setTimeout(() => { console.log( `${new Date().toLocaleTimeString()} - untilInput$ emit` ); subscriber.next(); subscriber.complete(); }, 5000);});input$.pipe(skipUntil(untilInput$)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [skipUntil]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [skipUntil]: complete`),});10:34:37 - untilInput$ emit10:34:37 - [skipUntil]: 410:34:38 - [skipUntil]: complete
Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.
import { Observable } from "rxjs";import { skipWhile } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 5) subscriber.next(count++); else { subscriber.next(count++); subscriber.complete(); } }, 1000); return () => { clearInterval(id); };});input$.pipe(skipWhile(val => val < 3)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [skipWhile]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [skipWhile]: complete`),});10:36:34 - [skipWhile]: 310:36:35 - [skipWhile]: 410:36:36 - [skipWhile]: 510:36:36 - [skipWhile]: complete
Emits only the first count values emitted by the source Observable.
import { Observable } from "rxjs";import { take } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { subscriber.next(count++); }, 1000); return () => { clearInterval(id); subscriber.complete(); };});input$.pipe(take(2)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [take]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [take]: complete`),});10:39:39 - [take]: 010:39:40 - [take]: 110:39:40 - [take]: complete
take(1) you get only the first value, if you write take(2) you get only the first two values.
It's important to remember that when the operator emits the last value completes the subscription too.
Waits for the source to complete, then emits the last N values from the source, as specified by the count argument.
import { Observable } from "rxjs";import { takeLast } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 5) subscriber.next(count++); else { clearInterval(id); subscriber.complete(); } }, 1000);});console.log(`${new Date().toLocaleTimeString()} - [takeLast]: start`)input$.pipe(takeLast(3)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [takeLast]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [takeLast]: complete`),});10:40:08 - [takeLast]: start10:40:14 - [takeLast]: 210:40:14 - [takeLast]: 310:40:14 - [takeLast]: 410:40:14 - [takeLast]: complete
Emits the values emitted by the source Observable until a notifier Observable emits a value.
import { Observable } from "rxjs";import { takeUntil } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { if (count < 5) subscriber.next(++count); else { clearInterval(id); subscriber.complete(); } }, 1000);});const untilInput$ = new Observable<void>(subscriber => { setTimeout(() => { console.log( `${new Date().toLocaleTimeString()} - untilInput$ emit` ); subscriber.next(); subscriber.complete(); }, 2500);});input$.pipe(takeUntil(untilInput$)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [takeUntil]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [takeUntil]: complete`),});10:40:38 - [takeUntil]: 110:40:39 - [takeUntil]: 210:40:42 - untilInput$ emit10:40:42 - [takeUntil]: complete
Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.
import { Observable } from "rxjs";import { takeWhile } from "rxjs/operators";const input$ = new Observable<number>(subscriber => { let count = 0; const id = setInterval(() => { subscriber.next(count++); }, 1000); return () => { clearInterval(id); subscriber.complete(); };});input$.pipe(takeWhile(val => val < 3)).subscribe({ next: x => console.log(`${new Date().toLocaleTimeString()} - [takeWhile]: ${x}`), complete: () => console.log(`${new Date().toLocaleTimeString()} - [takeWhile]: complete`),});10:41:10 - [takeWhile]: 010:41:11 - [takeWhile]: 110:41:12 - [takeWhile]: 210:41:13 - [takeWhile]: complete
Ok Guys, I think that today it's enough for the Filter Operators. You can find all the code of this article here.
See you in the next article. Bye Bye!
















