MERGING
Combines multiple observables into one by merging their emissions. 

  • The merge() operator in RxJS takes multiple Observables as input and returns a single Observable that emits all the values emitted by its input Observables in the order they arrive.

 

import { of, merge, interval } from 'rxjs';
import { take } from 'rxjs/operators';


const source1$ = of(1, 2, 3);
const source2$ = of('a', 'b');


const source3$ = interval(1000).pipe(take(2)); // Emits 0, 1 after 1s each

  • interval() is an RxJS creation operator that returns an Observable.
  • The argument 1000 specifies the period in milliseconds. interval(1000) emits a sequence of incrementing numbers, starting from 0, with the specified delay (1000 milliseconds) between each emission. It doesn't emit the time elapsed since the start.
  • This Observable will emit a sequence of numbers (0, 1, 2, 3, ...) every 1000 milliseconds (which is 1 second). It will continue to emit indefinitely unless unsubscribed.
    •   After 1 second: 0
    •  After 2 seconds: 1
    • After 3 seconds: 2
    • After 4 seconds: 3
    • And so on…
  • .pipe(take(2)): This then takes the first two values emitted by the interval(1000) Observable. So, it will receive and pass along 0 and then 1. After receiving the second value, take(2) will complete the Observable.

 

const merged1$ = merge(source1$, source2$, source3$);
merged1$.subscribe({
    next: (value) => console.log('Merged 1:', value),
    complete: () => console.log('Merged 1 completed'),
});
/*

Example 1 - Merging Observables that complete at different times:
Merged 1: 1
Merged 1: 2
Merged 1: 3
Merged 1: a
Merged 1: b
Merged 1: 0 (after 1 second)
Merged 1: 1 (after another second)
Merged 1 completed
*/

 

Merging Observables with overlapping emissions

const interval1$ = interval(500).pipe(take(3)); // Emits 0, 1, 2 every 0.5s
const interval2$ = interval(800).pipe(take(2)); // Emits 0, 1 every 0.8s

const merged2$ = merge(interval1$, interval2$);


merged2$.subscribe({
    next: (value) => console.log('Merged 2:', value),
    complete: () => console.log('Merged 2 completed'),
});
/*
Merged 2: 0 (after 0.5s from interval1$)
Merged 2: 0 (after 0.8s from interval2$)
Merged 2: 1 (after 1s from interval1$)
Merged 2: 1 (after 1.6s from interval2$)
Merged 2: 2 (after 1.5s from interval1$)
Merged 2 completed
*/

 

 

COMBINELATEST
Combines the latest values from multiple observables into an array or object. 

import { interval, combineLatest, of } from 'rxjs';
import { take } from 'rxjs/operators';


const interval1$ = interval(1000).pipe(take(3)); // Emits 0, 1, 2 every 1s
const interval2$ = interval(1500).pipe(take(2)); // Emits 0, 1 every 1.5s

const combined1$ = combineLatest(interval1$, interval2$);


combined1$.subscribe({
    next: (value) => console.log('Combined 1:', value), 
     // Output: [latest from     interval1, latest from interval2]
     complete: () => console.log('Combined 1 completed'),
});


/*
Combined 1: [ 0, 0 ] (after 1.5s - both have emitted at least once)
Combined 1: [ 1, 0 ] (after 2s)
Combined 1: [ 1, 1 ] (after 3s)
Combined 1: [ 2, 1 ] (after 4s)
Combined 1 completed (after interval2 completes at 3s and interval1 at 3s)
*/

combineLatest() only emits a value when all of the input Observables have emitted at least one value. interval2$ emits 0 after 1.5 seconds. Now, both interval1$ and interval2$ have emitted at least one value. 

 

Can combine Params and Query Parameter


FORK JOIN

  • forkJoin(): Combines the last values emitted by multiple observables and emits a single value when all observables complete.

 

import { of, forkJoin, timer } from 'rxjs';
import { delay, take } from 'rxjs/operators';


const source1$ = of('Hello');
const source2$ = of('World').pipe(delay(100));
const source3$ = of('!');

const result1$ = forkJoin([source1$, source2$, source3$]);

 


result1$.subscribe({
       next: (value) => console.log('Result 1:', value), 
       // Output: ['Hello', 'World', '!'] (after 100ms)


      complete: () => console.log('Result 1 completed'), 
      // Completes after all sources complete


     error: (err) => console.error('Result 1 error:', err),
});


/*
Output:
Result 1: [ 'Hello', 'World', '!' ]
Result 1 completed
*/

 

forkJoin with Observables that take time

 

const timer1$ = timer(500).pipe(take(1)); // Emits 0 after 500ms and completes
const timer2$ = timer(1000).pipe(take(1), mapTo('Delayed Value')); // Emits 'Delayed Value' after 1s and completes

  • Waits for 1 second.
  • Emits the number 0.
  • take(1) intercepts this 0 and allows it to pass through, then completes.
  • mapTo('Delayed Value') receives the 0 and transforms it into the string 'Delayed Value'.
  • The resulting timer2$ Observable emits 'Delayed Value' after 1 second and then completes.

 

const result2$ = forkJoin([timer1$, timer2$]);

console.log('Example 2: forkJoin with Observables that take time');
result2$.subscribe({
next: (value) => console.log('Result 2:', value), // Output: [0, 'Delayed Value'] (after 1s)
complete: () => console.log('Result 2 completed'),
error: (err) => console.error('Result 2 error:', err),
});
/*
Output:
Example 2: forkJoin with Observables that take time
Result 2: [ 0, 'Delayed Value' ]
Result 2 completed
*/

 

Get statistics

forkJoin({
    users: this.teamsService.getAllQueuesUsersData(0).pipe(map(res => (res) ? res?.length : 0)),
    team: this.teamsService.getQueueTeamLeadList().pipe(map(res => (res) ? res?.length : 0)),
    clients: this.qualificationService.getCompanies().pipe(map(res => (res) ? res?.length : 0))
}).subscribe(data => {
    this.totalUserCount = data.users;
    this.teamLeadCount = data.team;
    this.allClientsCount = data.clients;
});


Related Question