{"id":11087,"date":"2021-09-10T01:37:54","date_gmt":"2021-09-10T01:37:54","guid":{"rendered":"https:\/\/www.htmlgoodies.com\/?p=11087"},"modified":"2021-09-13T01:52:06","modified_gmt":"2021-09-13T01:52:06","slug":"rxjs-observables-combinelatest-forkjoin","status":"publish","type":"post","link":"https:\/\/www.htmlgoodies.com\/javascript\/rxjs-observables-combinelatest-forkjoin\/","title":{"rendered":"Combining RxJS Observables Using combineLatest and forkJoin"},"content":{"rendered":"
One of the hallmarks of complex applications is the sourcing of data from numerous sources. These can be external data providers, applications, or a local database. RxJS provides a variety of operators for combining these into one observable based on order, time, and\/or structure of emitted values. In this article, we’ll learn to subscribe to multiple streams simultaneously by employing two of the most popular RxJS combination operators: combineLatest<\/i> and forkJoin<\/i>.<\/p>\nRxJS combineLatest Operator<\/i><\/h2>\n
One of the most common reasons for combining observables is to perform some type of calculation or determination using data from each. The combineLatest<\/i> operator emits an item whenever any of the source observables emits a value, but only once each of the source observables has emitted at least one value. As such, whenever any of the source observables emits a value, combineLatest<\/i>:<\/p>\n Here is the combineLatest<\/i> signature:<\/p>\n The following code snippet shows a typical usage scenario of combineLatest<\/i> usage. You’ll notice that the weight<\/i> and height<\/i> observables are passed to combineLatest<\/i> as an array (of arrays). The second (optional) parameter is the project function that calculates the BMI using the emitted values from the weight<\/i> and height<\/i> observables. Finally, the results of the calculation are returned as a new observable:<\/p>\n The console.log()<\/em> output from the project function shows that the last emitted weight value is used in all calculations. It is combined with each of the height<\/i> values to produce the BMI results. This happens because the weight<\/i> observable values are emitted immediately without a delay. Hence:<\/p>\n if one observable emits values before the others do, then those values are lost.<\/strong><\/p>\n The number of values emitted from combineLatest<\/i> is determined by the stream that emits the least number of values, in this case, height<\/i>.<\/p>\n Like combineLatest<\/i>, forkJoin<\/i> also takes an array of observables. However, it emits an array of values in the exact same order as the passed array. The returned observable will emit the last values emitted from each input stream.<\/p>\n The signature of forkJoin<\/i> is nearly identical to that of combineLatest<\/i>:<\/p>\n If we rerun our previous example, substituting forkJoin<\/i> for combineLatest<\/i>, we can see that only one BMI value is produced:<\/p>\n In order to emit the last values emitted from each input stream, forkJoin<\/i> needs to know when all streams have completed. This begs the question, how does it know when a stream is done emitting values. The answer lies is in the Subscription class, which implements the Observer interface. In the recent RxJS Observables Primer in Angular<\/a> article, we learned about the next()<\/em>, error()<\/em>, and complete()<\/em> handlers of the Observer interface. The Combination Operators wait for the complete()<\/em> signal to combine streams.<\/p>\n The of()<\/em> function takes care of calling complete()<\/em> for us, but it’s our responsibility when creating our own observables. We can see that both observables below call complete()<\/em> when done emitting values:<\/p>\n Once again, forkJoin<\/i> returns 3.671253629592222.<\/p>\n\n
combineLatest(observables: ...Observable [, project: function]): Observable\r\n<\/pre>\n
import { combineLatest, of } from 'rxjs';\r\n\r\nconst weight = of(70, 72, 76, 79, 75);\r\nconst height = of(1.76, 1.77, 1.78);\r\nconst bmi = combineLatest([weight, height], (w, h) => {\r\n console.log('project values: w = ', w, ', h = ', h);\r\n return w \/ (h * h);\r\n});\r\nbmi.subscribe(res => console.log('BMI is ' + res));\r\n\r\n\/\/ Output to console is:\r\n\/\/ project values: w = 75, h = 1.76\r\n\/\/ BMI is 24.212293388429753\r\n\/\/ project values: w = 75, h = 1.77\r\n\/\/ BMI is 23.93948099205209\r\n\/\/ project values: w = 75, h = 1.78\r\n\/\/ BMI is 23.671253629592222\r\n<\/pre>\n
RxJS forkJoin Operator<\/i><\/h2>\n
forkJoin(...args [, selector : function]): Observable\r\n<\/pre>\n
const bmi = forkJoin([weight, height], (w, h) => {\r\n console.log('selector values: w =', w, ', h =', h);\r\n return w \/ (h * h);\r\n});\r\nbmi.subscribe(res => console.log('BMI is ' + res));\r\n\r\n\/\/ Output to console is:\r\n\/\/ selector values: w = 75, h = 1.78\r\n\/\/ BMI is 3.671253629592222\r\n<\/pre>\n
How Combination Operators Know When All Observables Have Completed<\/h3>\n
const weight = \r\n new Observable<number>((