组合操作
为了协调数据的并行序列,我们可以组合发布者
。由于生成序列是合并的结果,它们也可以用于数据的[异步转换]异步转换。
通过非阻塞的协调方式可以避免开发者使用 Future.get()
或 Promise.await()
,这两个方法在多信号存在是容易引发问题。非阻塞意味着管道除了订阅者
的需求,不会做任何等待。订阅者
的请求将被切分至最小,然后分配给已经组合的发布者
。
合并行为在 FanInAction
中建模,并通过一个订阅者
委托的线程偷取型 SerializedSubscriber
代理处理并行信号。它将对校验每个信号,查看对应的委托订阅者
是否已经运行,如果没有运行,则重新分配信号。当繁忙的线程关闭订阅者
代码时,信号将被轮询,处理信号的线程很可能已经不再是生产它的那个了。
在使用
flatMap
之间就削减需求信号量 没法说是好主意还是坏主意。实际上,如果无法处理所有的数据,是没有必要订阅多个并行发布者
并合并操作的。然而它对并行发布者
数据量的限制,也不会给予高速发布者
挂起请求的机会。
Stream.zipWith(Function)
Streams
.range(1, 100)
.zipWith( Streams.generate(System::currentTimeMillis), tuple -> tuple ) //1
.consume(
tuple -> System.out.println("number: "+tuple.getT1()+" time: "+tuple.getT2()) , //2
Throwable::printStackTrace,
avoid -> System.out.println("--complete--")
);
- “Zip” 或聚合来自
RangeStream
的最新的信号,传递SupplierStream
以提供当前时间。 - 通过 “Zip” 操作,压缩
发布者
按照声明的顺序(自左及右,stream1.zipWith(stream2)
)生成数据元组。
表13,组合数据源
功能 API 或工厂方法 | 角色 |
---|---|
Stream.flatMap(Function<T, Publisher<V>>) |
这是一个[异步转换]异步转换,是一个 map(Function<T, Publisher<V>>).merge() 的类型化快捷方式。映射部分使用传递来的数据 T 生成一个 Publisher<V> ,这是微服务构架中常用的一种模式。
|
Streams.switchOnNext(Publisher<Publisher<T>>) |
在传入的发布者发出 onNext(Publisher<T>) 信号的间隙,Stream 会以 FIFO(先进先出)的顺序轮替。这个信号将使下游 Subscriber<T> 接收到下一个 onNext(T) 的发布者序列。当 onNext(Publisher<T>) 信号被接收时,可能会打断上游正在进行的发布。所有的 Publisher<T> 都必须在 Subscriber<T> 完结之前完结。
|
Streams.merge(Publisher<T>, Publisher<T> x7)Streams.merge(Publisher<Publisher<T>>)Stream.mergeWith(Publisher<T>)Stream.merge() |
通过安全的并行订阅,将上游的Publisher<T> 序列转化成 T 的序列。 T 序列是插入的,没有顺序保障。如果参数直接就是 Publisher<T> ,一如 Stream.mergeWith(Publisher<T>) 或 Streams.merge(Publisher<T>, Publisher<T>) ,MergeAction 将直接订阅它,并更高效的分配大小(若知道上游的并行数量)。所有的 Publisher<T> 都必须在 Subscriber<T> 完结之前完结。
|
Streams.concat(Publisher<T>, Publisher<T> x7)Streams.concat(Publisher<Publisher<T>>)Stream.concatWith(Publisher)Stream.startWith(Publisher) |
类似 merge() ,但如果一个 Publisher<T> 已经在发布中,则等待它的 onComplete() ,然后再接处理下一个挂起的 Publisher<T>。序列将以声明的顺序被订阅,自左及右,例如 stream1.concatWith(stream2) ,或是使用参数—— stream2.startWith(stream1) 。
|
Streams.combineLatest(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)Streams.combineLatest(Publisher<Publisher<T>>, Function<Tuple,V>) |
合并来自独立Publisher<T> 的最新 onNext(T) 信号。合并状态持续到信号源 Publisher<T> 发出新的 onNext(T) 信号并取代它。在所有 Publisher<T> 都发出至少一个信号之后,给定的组合器函数将接收所有合并的信号,并生成期望的合并对象。
|
Streams.zip(Publisher<T>, Publisher<T> x7, Function<Tuple,V>)Streams.zip(Publisher<Publisher<T>>, Function<Tuple,V>)Stream.zipWith(Publisher<T>, Function<Tuple2,V>) |
合并来自独立Publisher<T> 的最新 onNext(T) 信号。合并状态持续到信号源 Publisher<T> 发出新的 onNext(T) 信号并取代它。每当 所有 Publisher<T> 都发出一个信号时,给定的压缩函数将接收所有的信号,并生成期望的压缩对象。如果过一个 Publisher<T> 完结,下游 Subscriber<T> 也将完结。
|
Streams.join(Publisher<T>, Publisher<T> x7)Streams.join(Publisher<Publisher<T>>)Stream.joinWith(Publisher<T>) |
功能预设配置的 zip 命令的快捷方法,可以将最新的 Tuple 转化成 List<?> 。
|