mergeMap

签名: mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable

映射成 observable 并发出值。


:bulb: flatMap 是 mergeMap 的别名!

:bulb: 如果同一时间应该只有一个内部 subscription 是有效的,请尝试 switchMap

:bulb: 如果内部 observables 发送和订阅的顺序很重要,请尝试 concatMap


为什么使用 mergeMap

当想要打平内部 observable 并手动控制内部订阅数量时,此操作符是最适合的。

例如,当使用 switchMap 时,源 observable 发出值时,每个内部订阅都是完成的,只允许存在一个活动的内部订阅。与此相反,mergeMap 允许同一时间存在多个活动的内部订阅。正因为如此,mergeMap 最常见的用例便是不会被取消的请求,可以将其考虑成写,而不是读。注意如果需要考虑顺序的话,
concatMap 会是更好的选择。

注意,因为 mergeMap 同时维护多个活动的内部订阅,由于这些长期活动的内部订阅,所以是有可能产生内存泄露的。举个例子,如果你将 observable 映射成内部的定时器或 DOM 事件流。在这些案例中,如果你仍然想用 mergeMap 的话,你应该利用另一个操作符来管理内部订阅的完成,比如 taketakeUntil。你还可以使用 concurrent 参数来限制活动的内部订阅的数量,参见 示例 4

mergeMap - 图4

示例

示例 1: 使用 observable 进行 mergeMap

( StackBlitz |
jsBin |
jsFiddle )

  1. import { of } from 'rxjs/observable/of';
  2. import { mergeMap } from 'rxjs/operators';
  3. // 发出 'Hello'
  4. const source = of('Hello');
  5. // 映射成 observable 并将其打平
  6. const example = source.pipe(mergeMap(val => of(`${val} World!`)));
  7. // 输出: 'Hello World!'
  8. const subscribe = example.subscribe(val => console.log(val));
示例 2: 使用 promise 进行 mergeMap

( StackBlitz |
jsBin |
jsFiddle )

  1. import { of } from 'rxjs/observable/of';
  2. import { mergeMap } from 'rxjs/operators';
  3. // 发出 'Hello'
  4. const source = of('Hello');
  5. // mergeMap 还会发出 promise 的结果
  6. const myPromise = val =>
  7. new Promise(resolve => resolve(`${val} World From Promise!`));
  8. // 映射成 promise 并发出结果
  9. const example = source.pipe(mergeMap(val => myPromise(val)));
  10. // 输出: 'Hello World From Promise'
  11. const subscribe = example.subscribe(val => console.log(val));
示例 3: 使用 resultSelector 函数进行 mergeMap

( StackBlitz |
jsBin |
jsFiddle )

  1. import { of } from 'rxjs/observable/of';
  2. import { mergeMap } from 'rxjs/operators';
  3. /*
  4. 你还可以提供第二个参数,它接收源 observable 的值并发出内部 observable 或 promise 的值
  5. */
  6. // 发出 'Hello'
  7. const source = of('Hello');
  8. // mergeMap 还会发出 promise 的结果
  9. const myPromise = val =>
  10. new Promise(resolve => resolve(`${val} World From Promise!`));
  11. const example = source.pipe(
  12. mergeMap(
  13. val => myPromise(val),
  14. (valueFromSource, valueFromPromise) => {
  15. return `Source: ${valueFromSource}, Promise: ${valueFromPromise}`;
  16. }
  17. )
  18. );
  19. // 输出: "Source: Hello, Promise: Hello World From Promise!"
  20. const subscribe = example.subscribe(val => console.log(val));
示例 4: 使用 concurrent 值进行 mergeMap

( StackBlitz |
jsBin |
jsFiddle )

  1. import { interval } from 'rxjs/observable/interval';
  2. import { mergeMap, take } from 'rxjs/operators';
  3. // 每1秒发出值
  4. const source = interval(1000);
  5. const example = source.pipe(
  6. mergeMap(
  7. //project
  8. val => interval(5000).pipe(take(2)),
  9. //resultSelector
  10. (oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal],
  11. //concurrent
  12. 2
  13. )
  14. );
  15. /*
  16. 输出:
  17. [0, 0, 0, 0] <--第一个内部 observable
  18. [1, 1, 0, 0] <--第二个内部 observable
  19. [0, 0, 1, 1] <--第一个内部 observable
  20. [1, 1, 1, 1] <--第二个内部 observable
  21. [2, 2, 0, 0] <--第三个内部 observable
  22. [3, 3, 0, 0] <--第四个内部 observable
  23. */
  24. const subscribe = example.subscribe(val => console.log(val));

其他资源


:file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/mergeMap.ts