从现有的 Reactive Publisher 创建
现有的 Reactive Stream 的 Publishers
能很好的从包括用户的其它实现那里,或者从 Reactor 本身创建出来。
用例包括:
- 用来协调各种数据源的可以组合使用的 API
- 懒惰的资源访问,在订阅和请求上读取一个数据源,例如远程HTTP调用。
- 面向数据的操作,比如Key/Value的
元组流(Tuple Stream)
,持续流(Persistent Stream)
或解码。 - 使用
Stream API
的原生发布器装饰
Streams.concat() 和 Streams.wrap() 实战
Processor<String,String> processor = RingBufferProcessor.create();
Stream<String> st1 = Streams.just("Hello "); (1)
Stream<String> st2 = Streams.just("World "); (1)
Stream<String> st3 = Streams.wrap(processor); (2)
Streams.concat(st1, st2, st3) (3)
.reduce( (prev, next) -> prev + next ) (4)
.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); (5)
processor.onNext("!");
processor.onComplete();
- 从一个已知的值创建
Stream
。 - 使用
Stream API
装饰核心处理器。注意:Streams.concat()
会将处理器直接作为一个可用的 Publisher 参数来接收。 - 连接 3 个上行流的来源 (所有的 st1,然后是所有的 st2 , 再然后是所有的 st3 ) 。
- 两个两个的对输入进行累积,并在最后从 st3 完成之后,在完成的上行流上发送结果。
- 在管道上产生需求,意味着“现在就开始执行”。
表6.从可用的 Reactive Stream Publisher创建
工厂方法 | 数据类型 | |
---|---|---|
作用 | ||
Streams.create(Publisher<T>) | T |
当第一次 Subscription.request(N) 命中返回的 Stream 时只订阅传入的 Publisher 。因而它支持不会按每个规范所需要的那样调用 Subscriber.onSubscribe(Subscription) 的格式不正确的 Publisher 。
|
Streams.wrap(Publisher<T>) | T |
一个针对传入 Publisher.subscribe(Subscriber<T>) 参数的下发 Stream 。只支持正确使用 Reactive Stream 协议的格式正确的 Publisher:
onSubscribe > onNext > (onError | onComplete)
|
Streams.defer(Supplier<Publisher<T>>) | T |
每次 Stream.subscribe(Subscriber) 被调用时,会使用由 Supplier.get() 提供的间接界别的惰性 Publisher 访问。
|
Streams.createWith(BiConsumer<Long,SubscriberWithContext<T, C>, Function<Subscriber<T>,C>, Consumer<C>) | T |
一个针对每个 Subscriber 请求,启动和停止事件都有明确回调的 Stream 生成器。类似于去掉了通用模板的 Streams.create(Publisher) 。
|
Streams.switchOnNext(Publisher<Publisher<T>>) | T |
一个 Stream 会以 FIFO(先进先出)的顺序在从传入的 Publisher 发送 onNext(Publisher<T>) 间隙变更。 信号会导致在下行数据流 Subscriber<T> 中接收下一个 onNext(T) 的 Publisher 序列。当 onNext(Publisher<T>) 信号被接收到时它可能会打断正在上行的流的发布。
|
Streams.concat(Publisher<T>, Publisher<T>) Streams.concat(Publisher<Publisher<T>>) | T |
如果一个 Publisher<T> 已经发送了,在处理下一个等待处理的 Publisher<T> 之前要等待这个 onComplete() 。 其名称就暗示它对于串联不同的数据源并保持顺序正确,这些方面的作用。
|
Streams.merge(Publisher<T>, Publisher<T>, Publisher<T>) Streams.merge(Publisher<Publisher<T>>) | T |
接收多种数据源并将它们各自的序列交替插入。顺序不会像使用 concat 那样被保持。 对于 Subscriber 的需求将会在不同来源之间被分隔开,最小的数量是 1,以确保每个都能有发送一些东西的机会。
|
Streams.combineLatest(Publisher<T1>, Publisher<T2>, Publisher<T3-N> x6, Function<Tuple2-N, C>) | C |
使用给定的累积 Function 来整合最近发出的来自于传入来源的元素。
|
Streams.combineLatest(Publisher<T1>, Publisher<T2>, Publisher<T3-N> x6, Function<Tuple2-N, C>) | C |
每次有来源发送了信号,应用给定的 Function ,并清理临时累积结果,将最近的元素整合一次。 事实上这是一种针对多类型来源的灵活的联合机制。
|
Streams.join(Publisher<T>, Publisher<T>, Publisher<T> x6) | List<T> | 一个压缩的快捷操作,只用来累加一个匹配传入的参数源的顺序的列表中完成了的累积结果。 |
Streams.await(Publisher<>, long, unit, boolean) | void |
阻塞调用的线程,直到传入的 Publisher 发出 onComplete 。可选的参数用来协调处理超时,以及需要有请求数据传入的情况。 如果最终状态时 onError 它就会抛出一个异常。
|
IOStreams.<K,V>persistentMap(String, deleteOnExit) | V | ChronicleStream 构造器的一个简单的快捷操作 ,它是一个基于磁盘的附加/尾部(appender/tailer)日志记录器。命名参数必须匹配 /tmp/persistent-queue[name] 下面一个已经存在的队列。 |
IOStreams.<K,V>persistentMapReader(String) | V | ChronicleReaderStream 构造器的一个简单的快捷操作 ,它是一个基于磁盘的尾部日志记录器(tailer)。命名参数必须匹配 /tmp/persistent-queue[name] 下的一个已经存在的队列。 |
IOStreams.decode(Codec<SRC, IN, ?>, Publisher<SRC>) | IN | 使用 Codec 解码器 可以将源数据类型解码成 IN 类型。 |
BiStreams.reduceByKey(Publisher<Tuple2<KEY,VALUE>>, Map<KEY,VALUE>, Publisher<MapStream.Signal<KEY, VALUE>>, BiFunction<VALUE, VALUE, VALUE>) | Tuple2<KEY,VALUE> |
一种键值对操作,他会累加按次序两个两个的传入 BiFunction 参数的 onNext(VALUE) 所计算出来的结果。结果只会在 onComplete() 中释放。可选项让你可以使用现有的一个 map 来存储和侦听其事件。
|
BiStreams.scanByKey(Publisher<Tuple2<KEY,VALUE>>, Map<KEY,VALUE>, Publisher<MapStream.Signal<KEY, VALUE>>, BiFunction<VALUE, VALUE, VALUE>) | Tuple2<KEY,VALUE> |
一种键值对操作,它会累加按次序两个两个的传入 BiFunction 参数的 onNext(VALUE) 所计算出来的结果。结果在每次存储好之后就被释放了。可选项让你可以使用现有的一个 map 来存储和侦听其事件。
|
Promises.when(Promise<T1>, Promise<T2>, Promise<T3-N> x6) | TupleN<T1,T2,\?> |
将所有来自于 Promise 的但个结果合到一起,并为新的 Promise 提供汇总的 Tuple 。
|
Promises.any(Promise<T>, Promise<T>, Promise<T> x6) | T |
选取传入的约定(promise)中第一个信号可用的,然后对结果中返回的 Promise 发出 onNext(T) 。
|
Promises.multiWhen(Promise<T>, Promise<T>, Promise<T> x6) | List<T> |
将所有来自于 Promise 参数的单个结果都合到一起,提供一个汇总的 List 。同 when 方法的不同之处在于约定(promise) 的类型必须匹配。
|