从热数据源创建
如果你正在处理的是一种数组的无边界的流,就好像一个通过REST接口接收用户输入的WEB应用程序,我也许想要使用 Reactor 中我们称作 Broadcaster 的“热”类型的 Stream
。为了使用它,你得在 Broadcaster
上声明可组合的,函数式任务的管道并在稍后调用 Broadcaster.onNext(T)
来将值发布到管道上。
Broadcaster
是一个有效的Processor
和Consumer
。要onSubscribe
一个Broadcaster
是可能的,因为要将其作为Consumer
将Consumer.accept(T)
分配给Broadcaster.onNext(T)
是可能的。
Broadcaster.create()
Broadcaster<String> sink = Broadcaster.create(Environment.get());//1
sink.map(String::toUpperCase) //2
.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); //3
sink.onNext("Hello World!"); //4
- 使用默认的方式创建一个
Broadcaster
, 将RingBufferDispatcher
作为Dispatcher
来进行共享。 - 使用常规的约定 —— map () 方法 —— 对输入进行转换。
.consume()
是一个 "终端" 操作 ,这意味着它使用 Reactive 流的语法来产生需求。- 将值发布到管道中,这将导致要对任务的调用。
热数据源永远不会被重播。用户在它们已经被传入
Stream.subscribe(Subscriber)
的那一刻看到数据。一个例外就是BehaviorBroadcaster
(会重播上次发射的单元);Streams.timer()
和Streams.period()
也会保持独立的时控游标, 但还是会忽略掉背压。
用户会看到在他们于时间 T 进行了订阅之后,每过 t+1[[N]] 的时间就会有新的数据 N 流经 Broadcaster 。
表7,创建灵活的Streams
工厂 | 输入 | 输出 | 作用 |
---|---|---|---|
Streams.timer(delay, unit, timer) | N/A | Long |
在 Stream.subscribe(Subscriber) 启动一个定时器调用和发送一个单独的 onNext(0L) 然后在延迟时间一过就 onComplete() 。要确保如何当前没有活跃的 Environment 就传入可选的 Timer 参数。 Subscription.request(long) 会因为没有可以应用到一个计划的发布的背压策略而被忽略。
|
Streams.period(period, unit, timer) | N/A | Long |
在 Stream.subscribe(Subscriber) 上启动一个定时器调用并在每个时间段发送 onNext(N) , 这里的 N 是一个从 0 开始的增量计数器。要确保如果当前没有活跃的 Environment 就传入可选的 Timer 参数。 Subscription.request(long) 会因为没有可以应用到一个计划的发布的背压策略而被忽略。
|
Streams.<T>switchOnNext() | Publisher<T> | T |
一个 Action ,它对于记录而言也是一个 Processor 。onNext(Publisher<T>) 信号会导致下行流 Subscriber<T> 接收下一个onNext(T) 的 Publisher 序列。它可能会在 onNext(Publisher<T>) 信号被接收到时打断当前上行流的发布。
|
Broadcaster.<T>create(Environment, Dispatcher) | T | T |
在任何被允许调用 onSubscribe 、 onNext 、onComplete 或者 onError 的上下文,和一个Stream下的这些信号的可组合序列。 如果没有订阅者在进行活跃的注册,下一个信号可能会触发一个 CancelException 。可选的 Dispatcher 和 Environment 参数定义了去哪里发送每个信号。最后,一个 Broadcaster 对于一个 Publisher 而言任何时候都可以被订阅,就像是一个 Stream 。
|
SerializedBroadcaster.create(Environment, Dispatcher) | T | T |
类似于 Broadcaster.create() 不过添加了对于来自于可能会调用相同的广播器 onXXX 方法的并行上下文的并发 onNext 的支持。
|
BehaviorBroadcaster.create(Environment, Dispatcher) | T | T |
类似于 Broadcaster.create() 不过总是会重播最后一条数据信号 (如果有的话 ) 以及针对新的 Subscriber 的最后的终端信号 (onComplete() ,onError(Throwable) )。
|
BehaviorBroadcaster.first(T, Environment, Dispatcher) | T | T |
类似于 BehaviorBroadcaster 不过是以一个默认值 T 开始的。
|
Streams.wrap(Processor<I, O>) | I | O |
一个针对传入的Publisher.subscribe(Subscriber<O>) 参数的分配 Stream 。只能正确的支持使用了Reactive Streams协议的格式良好的 Publisher:
onSubscribe > onNext > (onError ¦ onComplete)
|
Promises.<T>prepare(Environment, Dispatcher) Promises.ready() | T | T |
准备一个 Promise ,用来应对任何外部的上下文通过 onNext *仅有的一次调用。因为它是一个有状态的持有满足要求的约定的结果的容器,所有新的订阅者会立即在当前的线程上运行。
|
对于异步的广播,要总是考虑将 Core Processor 作为
Broadcaster
的替代:
Broadcaster
在没有订阅者时会触发一个 CancelException。一个核心RingBufferProcessor
将总是将缓存的数据传递到第一个订阅者。- 有些可以被赋值给
Broadcaster
的Dispatcher
类型可能不支持并发的onNext
。使用RingBuffer
Processor.share()
作为线程安全,并发 onNext 的替代。- RingBufferProcessor 支持通过一个下行流订阅者回放取消正在进行的处理这一事件,如果这个订阅者仍然在处理器线程上运行的话。Broadcaster 不会支持回放。
- RingBufferProcessor 比它们的带有一个 RingBufferDispatcher 的 Broadcaster 替代要更快。
- RingBufferWorkProcessor 支持扩大附上的订阅者的数量。
- Broadcaster 在 2.1 中可能会被发展为一个
Processor
, 这样就达成同样了的目的,并且没有让 Reactor 用户为Processor
和Broadcaster
之间的选择犯难。