组合操作

为了协调数据的并行序列,我们可以组合发布者。由于生成序列是合并的结果,它们也可以用于数据的[异步转换]异步转换

通过非阻塞的协调方式可以避免开发者使用 Future.get()Promise.await(),这两个方法在多信号存在是容易引发问题。非阻塞意味着管道除了订阅者的需求,不会做任何等待。订阅者的请求将被切分至最小,然后分配给已经组合的发布者

合并行为在 FanInAction 中建模,并通过一个订阅者委托的线程偷取型 SerializedSubscriber代理处理并行信号。它将对校验每个信号,查看对应的委托订阅者是否已经运行,如果没有运行,则重新分配信号。当繁忙的线程关闭订阅者代码时,信号将被轮询,处理信号的线程很可能已经不再是生产它的那个了。

在使用 flatMap 之间就削减需求信号量 没法说是好主意还是坏主意。实际上,如果无法处理所有的数据,是没有必要订阅多个并行发布者并合并操作的。然而它对并行发布者数据量的限制,也不会给予高速发布者挂起请求的机会。

Stream.zipWith(Function)

  1. Streams
  2. .range(1, 100)
  3. .zipWith( Streams.generate(System::currentTimeMillis), tuple -> tuple ) //1
  4. .consume(
  5. tuple -> System.out.println("number: "+tuple.getT1()+" time: "+tuple.getT2()) , //2
  6. Throwable::printStackTrace,
  7. avoid -> System.out.println("--complete--")
  8. );
  • “Zip” 或聚合来自 RangeStream 的最新的信号,传递 SupplierStream 以提供当前时间。
  • 通过 “Zip” 操作,压缩发布者按照声明的顺序(自左及右,stream1.zipWith(stream2))生成数据元组。
表13,组合数据源
功能 API 或工厂方法角色
Stream.flatMap(Function<T, Publisher<V>>) 这是一个[异步转换]异步转换,是一个 map(Function<T, Publisher<V>>).merge() 的类型化快捷方式。映射部分使用传递来的数据 T 生成一个 Publisher<V>,这是微服务构架中常用的一种模式。
Streams.switchOnNext(Publisher<Publisher<T>>) 在传入的发布者发出 onNext(Publisher<T>) 信号的间隙,Stream 会以 FIFO(先进先出)的顺序轮替。这个信号将使下游 Subscriber<T> 接收到下一个 onNext(T) 的发布者序列。当 onNext(Publisher<T>)信号被接收时,可能会打断上游正在进行的发布。所有的 Publisher<T> 都必须在 Subscriber<T> 完结之前完结。
Streams.merge(Publisher<T>, Publisher<T> x7)Streams.merge(Publisher<Publisher<T>>)Stream.mergeWith(Publisher<T>)Stream.merge() 通过安全的并行订阅,将上游的Publisher<T>序列转化成 T 的序列。 T 序列是插入的,没有顺序保障。如果参数直接就是 Publisher<T>,一如 Stream.mergeWith(Publisher<T>)Streams.merge(Publisher<T>, Publisher<T>)MergeAction 将直接订阅它,并更高效的分配大小(若知道上游的并行数量)。所有的 Publisher<T> 都必须在 Subscriber<T> 完结之前完结。
Streams.concat(Publisher<T>, Publisher<T> x7)Streams.concat(Publisher<Publisher<T>>)Stream.concatWith(Publisher)Stream.startWith(Publisher) 类似 merge() ,但如果一个 Publisher<T> 已经在发布中,则等待它的 onComplete(),然后再接处理下一个挂起的 Publisher<T>。序列将以声明的顺序被订阅,自左及右,例如 stream1.concatWith(stream2),或是使用参数—— stream2.startWith(stream1)
Streams.combineLatest(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)Streams.combineLatest(Publisher<Publisher<T>>, Function<Tuple,V>) 合并来自独立Publisher<T>的最新 onNext(T) 信号。合并状态持续到信号源 Publisher<T> 发出新的 onNext(T) 信号并取代它。在所有 Publisher<T> 都发出至少一个信号之后,给定的组合器函数将接收所有合并的信号,并生成期望的合并对象。
Streams.zip(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)Streams.zip(Publisher<Publisher<T>>, Function<Tuple,V>)Stream.zipWith(Publisher<T>, Function<Tuple2,V>) 合并来自独立Publisher<T>的最新 onNext(T) 信号。合并状态持续到信号源 Publisher<T> 发出新的 onNext(T) 信号并取代它。每当 所有 Publisher<T> 都发出一个信号时,给定的压缩函数将接收所有的信号,并生成期望的压缩对象。如果过一个 Publisher<T> 完结,下游 Subscriber<T> 也将完结。
Streams.join(Publisher<T>, Publisher<T> x7)Streams.join(Publisher<Publisher<T>>)Stream.joinWith(Publisher<T>) 功能预设配置的 zip命令的快捷方法,可以将最新的 Tuple 转化成 List<?>