Post contents
Hi Guys and Welcome Back, Today I'll speak about the Join Operators. Some of these operators are similar to other operators shown in the Creation Operators but they are used in different contexts, ok let's start!
Flattens an Observable-of-Observables by applying combineLatest when the Observable-of-Observables completes.
import { interval, timer } from 'rxjs';import { combineLatestAll, map, take } from 'rxjs/operators';const A_CHAR_CODE = 65const createSource = (index: number) => interval((index + 1) * 1000).pipe( take(3), map(value => `${String.fromCharCode(A_CHAR_CODE + index)}${String.fromCharCode(A_CHAR_CODE + value)}`) )console.log(`${new Date().toLocaleTimeString()}: combineLatestAll start`)const source$ = timer(0, 1000).pipe( take(3));source$.pipe( map(createSource), combineLatestAll()).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: combineLatestAll`, val)});16:22:07: combineLatestAll start16:22:12: combineLatestAll [ 'AB', 'BA', 'CA' ]16:22:12: combineLatestAll [ 'AC', 'BA', 'CA' ]16:22:13: combineLatestAll [ 'AC', 'BB', 'CA' ]16:22:15: combineLatestAll [ 'AC', 'BB', 'CB' ]16:22:15: combineLatestAll [ 'AC', 'BC', 'CB' ]16:22:18: combineLatestAll [ 'AC', 'BC', 'CC' ]
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
import { interval, timer } from 'rxjs';import { concatAll, map, take } from 'rxjs/operators';const createSource = (index: number) => interval(index * 1000).pipe( take(3), map(value => `${index}-${value + 1}`) )console.log(`${new Date().toLocaleTimeString()}: concatAll start`)const source$ = timer(0, 1000).pipe( take(3));source$.pipe( map(v => createSource(v + 1)), concatAll()).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: concatAll`, val)});15:38:29: concatAll start15:38:30: concatAll 1-115:38:31: concatAll 1-215:38:32: concatAll 1-315:38:34: concatAll 2-115:38:36: concatAll 2-215:38:38: concatAll 2-315:38:41: concatAll 3-115:38:44: concatAll 3-215:38:47: concatAll 3-3
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
import { interval, timer } from 'rxjs';import { exhaustAll, map, take } from 'rxjs/operators';const createSource = (index: number) => interval(index * 1000).pipe( take(3), map(value => `${index}-${value + 1}`) )console.log(`${new Date().toLocaleTimeString()}: exhaustAll start`)const source$ = timer(0, 2500).pipe( take(3));source$.pipe( map(v => createSource(v + 1)), exhaustAll()).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: exhaustAll`, val)});16:27:05: exhaustAll start16:27:07: exhaustAll 1-116:27:08: exhaustAll 1-216:27:09: exhaustAll 1-316:27:14: exhaustAll 3-116:27:17: exhaustAll 3-216:27:20: exhaustAll 3-3

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
import { interval, timer } from 'rxjs';import { map, mergeAll, take } from 'rxjs/operators';const createSource = (index: number) => interval(index * 1000).pipe( take(3), map(value => `${index}-${value + 1}`), )console.log(`${new Date().toLocaleTimeString()}: mergeAll start`)const source$ = timer(0, 1000).pipe( take(3),);source$.pipe( map(v => createSource(v + 1)), mergeAll()).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: mergeAll`, val)});15:41:46: mergeAll start15:41:47: mergeAll 1-115:41:48: mergeAll 1-215:41:49: mergeAll 2-115:41:49: mergeAll 1-315:41:51: mergeAll 3-115:41:51: mergeAll 2-215:41:53: mergeAll 2-315:41:54: mergeAll 3-215:41:57: mergeAll 3-3

Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence
import { interval, timer } from 'rxjs';import { map, switchAll, take } from 'rxjs/operators';const createSource = (index: number) => interval(index * 1000).pipe( take(3), map(value => `${index}-${value + 1}`) )console.log(`${new Date().toLocaleTimeString()}: switchAll start`)const source$ = timer(0, 1000).pipe( take(3));source$.pipe( map(v => createSource(v + 1)), switchAll()).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: switchAll`, val)});15:42:48: switchAll start15:42:49: switchAll 1-115:42:53: switchAll 3-115:42:56: switchAll 3-215:42:59: switchAll 3-3

Returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
import { of } from 'rxjs';import { startWith } from 'rxjs/operators';const source1$ = of(1, 2, 3).pipe( startWith(1000))source1$.subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: startWith`, val)});15:43:47: startWith 100015:43:47: startWith 115:43:47: startWith 215:43:47: startWith 3
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
import { interval } from 'rxjs';import { take, withLatestFrom } from 'rxjs/operators';const source1$ = interval(1000).pipe( take(5))const source2$ = interval(2500).pipe( take(5))console.log(`${new Date().toLocaleTimeString()}: withLatestFrom start`)source1$.pipe( withLatestFrom(source2$)).subscribe({ next: val => console.log(`${new Date().toLocaleTimeString()}: withLatestFrom`, val)});15:44:11: withLatestFrom start15:44:14: withLatestFrom [ 2, 0 ]15:44:15: withLatestFrom [ 3, 0 ]15:44:16: withLatestFrom [ 4, 1 ]
Ok guys, that's all for today! You can find the code of this article here
See you soon!

