forkJoin

签名: forkJoin(...args, selector : function): Observable

当所有 observables 完成时,发出每个 observable 的最新值。


:bulb: 如果你想要多个 observables 按发出顺序相对应的值的组合,试试 zip

:warning: 如果内部 observable 不完成的话,forkJoin 永远不会发出值!


为什么使用 forkJoin

当有一组 observables,但你只关心每个 observable 最后发出的值时,此操作符是最适合的。此操作符的一个常见用例是在页面加载(或其他事件)时你希望发起多个请求,并在所有请求都响应后再采取行动。它可能与 Promise.all 的使用方式类似。

注意,如果任意作用于 forkJoin 的内部 observable 报错的话,对于那些在内部 observable 上没有正确 catch 错误,从而导致完成的 observable,你将丢失它们的值 (参见示例 4)。如果你只关心所有内部 observables 是否成功完成的话,可以在外部捕获错误

还需要注意的是如果 observable 发出的项多于一个的话,并且你只关心前一个发出的话,那么 forkJoin 并非正确的选择。在这种情况下,应该选择像 combineLatestzip 这样的操作符。

forkJoin - 图3

示例

示例 1: Observables 再不同的时间间隔后完成

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, take } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { interval } from 'rxjs/observable/interval';
  5. const myPromise = val =>
  6. new Promise(resolve =>
  7. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  8. );
  9. /*
  10. 当所有 observables 完成时,将每个 observable
  11. 的最新值作为数组发出
  12. */
  13. const example = forkJoin(
  14. // 立即发出 'Hello'
  15. of('Hello'),
  16. // 1秒后发出 'World'
  17. of('World').pipe(delay(1000)),
  18. // 1秒后发出0
  19. interval(1000).pipe(take(1)),
  20. // 以1秒的时间间隔发出0和1
  21. interval(1000).pipe(take(2)),
  22. // 5秒后解析 'Promise Resolved' 的 promise
  23. myPromise('RESULT')
  24. );
  25. //输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]
  26. const subscribe = example.subscribe(val => console.log(val));
示例 2: 发起任意多个请求

( StackBlitz |
jsBin |
jsFiddle )

  1. import { mergeMap } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. const myPromise = val =>
  5. new Promise(resolve =>
  6. setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  7. );
  8. const source = of([1, 2, 3, 4, 5]);
  9. // 发出数组的全部5个结果
  10. const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
  11. /*
  12. 输出:
  13. [
  14. "Promise Resolved: 1",
  15. "Promise Resolved: 2",
  16. "Promise Resolved: 3",
  17. "Promise Resolved: 4",
  18. "Promise Resolved: 5"
  19. ]
  20. */
  21. const subscribe = example.subscribe(val => console.log(val));
示例 3: 在外部处理错误

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, catchError } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { _throw } from 'rxjs/observable/throw';
  5. /*
  6. 当所有 observables 完成时,将每个 observable
  7. 的最新值作为数组发出
  8. */
  9. const example = forkJoin(
  10. // 立即发出 'Hello'
  11. of('Hello'),
  12. // 1秒后发出 'World'
  13. of('World').pipe(delay(1000)),
  14. // 抛出错误
  15. _throw('This will error')
  16. ).pipe(catchError(error => of(error)));
  17. // 输出: 'This will Error'
  18. const subscribe = example.subscribe(val => console.log(val));
示例 4: 当某个内部 observable 报错时得到成功结果

( StackBlitz |
jsBin |
jsFiddle )

  1. import { delay, catchError } from 'rxjs/operators';
  2. import { forkJoin } from 'rxjs/observable/forkJoin';
  3. import { of } from 'rxjs/observable/of';
  4. import { _throw } from 'rxjs/observable/throw';
  5. /*
  6. 当所有 observables 完成时,将每个 observable
  7. 的最新值作为数组发出
  8. */
  9. const example = forkJoin(
  10. // 立即发出 'Hello'
  11. of('Hello'),
  12. // 1秒后发出 'World'
  13. of('World').pipe(delay(1000)),
  14. // 抛出错误
  15. _throw('This will error').pipe(catchError(error => of(error)))
  16. );
  17. // 输出: ["Hello", "World", "This will error"]
  18. const subscribe = example.subscribe(val => console.log(val));
示例 5: Angular 中的 forkJoin

( plunker )

  1. @Injectable()
  2. export class MyService {
  3. makeRequest(value: string, delayDuration: number) {
  4. // 模拟 http 请求
  5. return of(`Complete: ${value}`).pipe(
  6. delay(delayDuration)
  7. );
  8. }
  9. }
  10. @Component({
  11. selector: 'my-app',
  12. template: `
  13. <div>
  14. <h2>forkJoin Example</h2>
  15. <ul>
  16. <li> {{propOne}} </li>
  17. <li> {{propTwo}} </li>
  18. <li> {{propThree}} </li>
  19. </ul>
  20. </div>
  21. `,
  22. })
  23. export class App {
  24. public propOne: string;
  25. public propTwo: string;
  26. public propThree: string;
  27. constructor(private _myService: MyService) {}
  28. ngOnInit() {
  29. // 使用不同的延迟模拟3个请求
  30. forkJoin(
  31. this._myService.makeRequest('Request One', 2000),
  32. this._myService.makeRequest('Request Two', 1000)
  33. this._myService.makeRequest('Request Three', 3000)
  34. )
  35. .subscribe(([res1, res2, res3]) => {
  36. this.propOne = res1;
  37. this.propTwo = res2;
  38. this.propThree = res3;
  39. });
  40. }
  41. }

其他资源


:file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/observable/ForkJoinObservable.ts