从热数据源创建

如果你正在处理的是一种数组的无边界的流,就好像一个通过REST接口接收用户输入的WEB应用程序,我也许想要使用 Reactor 中我们称作 Broadcaster 的“热”类型的 Stream 。为了使用它,你得在 Broadcaster 上声明可组合的,函数式任务的管道并在稍后调用 Broadcaster.onNext(T) 来将值发布到管道上。

Broadcaster 是一个有效的 ProcessorConsumer。要 onSubscribe 一个 Broadcaster 是可能的,因为要将其作为 ConsumerConsumer.accept(T) 分配给 Broadcaster.onNext(T) 是可能的。

Broadcaster.create()

  1. Broadcaster<String> sink = Broadcaster.create(Environment.get());//1
  2. sink.map(String::toUpperCase) //2
  3. .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s)); //3
  4. sink.onNext("Hello World!"); //4
  • 使用默认的方式创建一个 Broadcaster, 将 RingBufferDispatcher 作为 Dispatcher 来进行共享。
  • 使用常规的约定 —— map () 方法 —— 对输入进行转换。
  • .consume() 是一个 "终端" 操作 ,这意味着它使用 Reactive 流的语法来产生需求。
  • 将值发布到管道中,这将导致要对任务的调用。

热数据源永远不会被重播。用户在它们已经被传入 Stream.subscribe(Subscriber) 的那一刻看到数据。一个例外就是 BehaviorBroadcaster(会重播上次发射的单元);Streams.timer()Streams.period() 也会保持独立的时控游标, 但还是会忽略掉背压。

用户会看到在他们于时间 T 进行了订阅之后,每过 t+1[[N]] 的时间就会有新的数据 N 流经 Broadcaster 。

表7,创建灵活的Streams
工厂输入输出作用
Streams.timer(delay, unit, timer) N/A Long Stream.subscribe(Subscriber) 启动一个定时器调用和发送一个单独的 onNext(0L) 然后在延迟时间一过就 onComplete()。要确保如何当前没有活跃的 Environment 就传入可选的 Timer 参数。 Subscription.request(long) 会因为没有可以应用到一个计划的发布的背压策略而被忽略。
Streams.period(period, unit, timer) N/A Long Stream.subscribe(Subscriber) 上启动一个定时器调用并在每个时间段发送 onNext(N) , 这里的 N 是一个从 0 开始的增量计数器。要确保如果当前没有活跃的 Environment 就传入可选的 Timer 参数。 Subscription.request(long) 会因为没有可以应用到一个计划的发布的背压策略而被忽略。
Streams.<T>switchOnNext() Publisher<T> T 一个 Action,它对于记录而言也是一个 ProcessoronNext(Publisher<T>) 信号会导致下行流 Subscriber<T> 接收下一个onNext(T)Publisher 序列。它可能会在 onNext(Publisher<T>) 信号被接收到时打断当前上行流的发布。
Broadcaster.<T>create(Environment, Dispatcher) T T 在任何被允许调用 onSubscribeonNextonComplete 或者 onError 的上下文,和一个Stream下的这些信号的可组合序列。 如果没有订阅者在进行活跃的注册,下一个信号可能会触发一个 CancelException。可选的 DispatcherEnvironment 参数定义了去哪里发送每个信号。最后,一个 Broadcaster 对于一个 Publisher 而言任何时候都可以被订阅,就像是一个 Stream
SerializedBroadcaster.create(Environment, Dispatcher) T T 类似于 Broadcaster.create() 不过添加了对于来自于可能会调用相同的广播器 onXXX 方法的并行上下文的并发 onNext 的支持。
BehaviorBroadcaster.create(Environment, Dispatcher) T T 类似于 Broadcaster.create() 不过总是会重播最后一条数据信号 (如果有的话 ) 以及针对新的 Subscriber 的最后的终端信号 (onComplete()onError(Throwable))。
BehaviorBroadcaster.first(T, Environment, Dispatcher) T T 类似于 BehaviorBroadcaster 不过是以一个默认值 T 开始的。
Streams.wrap(Processor<I, O>) I O 一个针对传入的Publisher.subscribe(Subscriber<O>) 参数的分配 Stream。只能正确的支持使用了Reactive Streams协议的格式良好的 Publisher: onSubscribe > onNext > (onError ¦ onComplete)
Promises.<T>prepare(Environment, Dispatcher) Promises.ready() T T 准备一个 Promise,用来应对任何外部的上下文通过 onNext *仅有的一次调用。因为它是一个有状态的持有满足要求的约定的结果的容器,所有新的订阅者会立即在当前的线程上运行。

对于异步的广播,要总是考虑将 Core Processor 作为 Broadcaster 的替代:

  • Broadcaster 在没有订阅者时会触发一个 CancelException。一个核心 RingBufferProcessor 将总是将缓存的数据传递到第一个订阅者。
  • 有些可以被赋值给 BroadcasterDispatcher 类型可能不支持并发的 onNext。使用 RingBufferProcessor.share() 作为线程安全,并发 onNext 的替代。
  • RingBufferProcessor 支持通过一个下行流订阅者回放取消正在进行的处理这一事件,如果这个订阅者仍然在处理器线程上运行的话。Broadcaster 不会支持回放。
  • RingBufferProcessor 比它们的带有一个 RingBufferDispatcher 的 Broadcaster 替代要更快。
  • RingBufferWorkProcessor 支持扩大附上的订阅者的数量。
  • Broadcaster 在 2.1 中可能会被发展为一个 Processor, 这样就达成同样了的目的,并且没有让 Reactor 用户为 ProcessorBroadcaster 之间的选择犯难。