绑定一个 Stream
Streams 操作 — 除了一些像终端动作和 broadcast()
的异常 — 将永远不会直接订阅,而是将懒惰地预备自己被订阅。这在函数式编程中常常被称为提升。
基本的意思就是 Reactor Stream
用户会明确的调用 Stream.subscribe(Subscriber)
或者可选的终端动作,比如 Stream.consume(Consumer)
来实现所有注册了的操作。在那之前 Actions
并不是真的存在的。我们使用 Stream.lift(Supplier)
来将这些 Action 的创建延迟到 Stream.subscribe(Subscriber)
被明确调用。
当所有的东西都绑定好了,每个动作都会维持一个上行流 Subscription
和一个下行流 Subscription
而 Reactive Streams 所有的约定都会应用到管道。
通常终端的动作会返回一个
Control
对象而不是Stream
。这是一个你可以用来请求或者取消一个管道的组件,不用再一个Subscriber
上下文里面或者是实现整个Subscriber
约定。
绑定两个管道
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...
Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");
//prepare two unique pipelines
Stream<String> actionChain1 = stream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = stream.dispatchOn(sharedDispatcher()).take(5).count();
actionChain1.consume(System.out::println); //start chain1
Control c = actionChain2.consume(System.out::println); //start chain2
//...
c.cancel(); //force this consumer to stop receiving data
图 10. 绑定之后
发布/订阅(Publish/Subscribe)
要从一个统一管道向订阅者输出, 可以使用 Stream.process(Processor)
,Stream.broadcast()
,Stream.broadcastOn()
和 Stream.broadcastTo()
。
共享一个上行流管道,并绑定两个下行流管道
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...
Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");
//prepare a shared pipeline
Stream<String> sharedStream = stream.observe(System.out::println).broadcast();
//prepare two unique pipelines
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = sharedStream.take(5).count();
actionChain1.consume(System.out::println); //start chain1
actionChain2.consume(System.out::println); //start chain2
图 11. 在绑定一个共享的Stream之后
表 8. 考虑终端的或者明确的订阅的操作
Stream<T> 方法 | 返回类型 | 作用 |
---|---|---|
subscribe(Subscriber<T>) subscribeOn | void |
订阅传入的 Subscriber<T> 并将任何附带的上行流实例化, 进行懒绑定(针对非终端操作的非明确的提升)。注意一个 Subscriber 必须请求数据,如果它期望一些数据的话。dispatchOn 和 subscribeOn 是提供给用传入的 Dispatcher 发送 onSubscribe 信号的可选项。
|
consume(Consumer<T>,Consumer<T>,Consumer<T>) consumeOn | Control |
每当有相关的信号被侦测到,就会使用一个同每个传入的 Consumer 交互的 ConsumerAction 来调用 subscribe 。它将会向接收的 Subscription 请求 request(Streams.capacity()) 获取容量限制, 默认是 Long.MAX_VALUE , 这会导致无节制的使用。subscribeOn 和 consumeOn 是提供给传入的 Dispatcher 发送 onSubscribe 信号的可选项。如有必要,就返回一个 Control 组件来取消 实例化的 Stream 。 注意如果 onNext(T) 信号触发了一个阻塞请求, ConsumeAction 就会维持一个无限的递归。
|
consumeLater() | Control |
类似于 consume ,但并不会启动一个初始的 Subscription.request(long) 。 返回的 Control 可以被用来在任何时候执行 request(long) 。
|
tap() | TapAndControls |
类似于 consume 但返回的是一个 TapAndControls ,它将会在每次 onNext(T) 接收到信号或者被取消时动态的更新。
|
batchConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Long,Long>)batchConsumeOn | Control |
类似于 consume 但将会请求映射的 Long 需求,给定了之前的需求并且以默认的 Stream.capacity() 开始。 对于要动态适配各种因素的需求很有用。
|
adaptiveConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Stream<Long>,Publisher<Long>>) adaptiveConsumeOn | Control |
类似于 batchConsume ,但是将会请求需要的 Long 值计算出来的序列。它可以被用来插入流程控制,比如延迟需求的 Streams.timer() 。
|
next() | Promise<T> |
返回一个 Promise<T> ,它会积极的向 Stream 发出订阅,对其进行实例化,并在解除注册之前请求一个信号数据。最近的 onNext(T), onComplete() or onError(Throwable)信号 将会满足约定。
|
toList() | Promise<List<T>> |
类似于 next() ,但是会等待直到整个序列已经产生 (onComplete() ) 并且会在一个单独的 List<T> 中传递累积的 onNext(T) 以满足返回的约定。
|
Stream.toBlockingQueue() | CompletableBlockingQueue<T> |
向 Stream 发出信号并返回一个累积所有 onNext 信号的阻塞 Queue<T> 。 CompletableBlockingQueue.isTerminated() 可以被用来作为一个阻塞 poll() 循环退出的条件。
|
cache() | Stream<T> | 将任何 Stream 转成冷 Stream,能够针对每个 Subscriber 单独回放所有的信号序列。因为动作的无边界属性,所以你可能应该只将它用于小的 (ish) 序列上。 |
broadcast() broadcastOn(Environment, Dispatcher) | Stream<T> |
将任何 Stream 转成热 Stream。这将会阻止管道通过当前的 Stream 的实例化发生重复,并准备向 N 个 Subscriber 下行流发布该信号。 需求将会从所有的子 Subscriber 处聚集。
|
broadcastTo(Subscriber<T>) | Subscriber<T> |
一个 Stream.subscribe 替代,因为返回的实体就是传入的参数,所以允许方法链。
|
process(Processor<T, O>) | Stream<O> |
类似于 broadcast() 但接受任何给定的 Processor<T, O> 。这里有一个对于 Core Processor 的完美介绍!
|