背压和溢出

多数情况下,依照 Reactor Stream 的协定,背压可以被自动处理。如果订阅者(Subscriber)请求的数据并没有超过其处理能力(例如类似 Long.MAXVALUE 的东西),数据源上游可以避免发送过多数据。如果你想在使用一个 “冷”的发布者(Publisher)** 时享受这种便利,你必须可以在任何时候关闭数据源的读取操作:从 socket 中读取多少、SQL 查询指针中有多少行、文件中有多少行、迭代构造体中有多少元素……_如果是 数据源,例如定时器或 UI 事件,或是一个可能从大型数据集上请求 Long.Max_VALUE 大小数据的订阅者(Subscriber),开发者必须针对背压制定明确的策略。

Reactor 提供了一系列处理冷热序列的 API

  • 非控(热)序列应当主动管理。
    • 减少 序列的信号量,例如“取样”。
    • 当需求超过容量时,忽略 数据。
    • 当需求超过容量时,缓冲 数据。
  • 受控(冷)序列应当被动管理。
    • 通过降低来自订阅者(Subscriber)Stream 上任意点的需求
    • 通过延迟请求断歇需求

Reactor 扩展文档中应用最广泛的示例就是 Marble Diagram,双时间线帮助我们更直观的了解发布者(Publisher)Stream以及订阅者(Subscriber) (如Action)在何时被观察,观察的内容又是什么。我们将使用这些图表来强调需求流,表明例如 Mapfilter 这样的变换的本质。

marble-101

当两个 Action 的调度器或容量不同时,Reactor 将自动提供一个内存溢出缓冲区。这不适用于核心处理器,它有自己的溢出处理机制。调度器可以重复使用,且 Reactor 必须限制调度器的数量,因此 Action 的调度器不同时,将添加内存缓冲区。

  1. Streams.just(1,2,3,4,5)
  2. .buffer(3) //1
  3. //onOverflowBuffer()
  4. .capacity(2) //2
  5. .consume()
  6. Streams.just(1,2,3,4,5)
  7. .dispatchOn(dispatcher1) //3
  8. //onOverflowBuffer()
  9. .dispatchOn(dispatcher2) //4
  10. .consume()
  • buffer 操作设定容量为 3。
  • consume() 或任何下游动作都被设定为 capacity(2),隐式的添加了一个 onOverflowBuffer()。
  • 在调度器 1 上执行第一个动作。
  • 在调度器 2 上执行第二个动作,隐式的添加了一个 onOverflowBuffer()。 最终Subscriber可以逐一的请求数据,限制管道中传输的数据为一个元素,并在每次成功调用 onNext(T) 后请求下一个元素。这种行为也可以通过 capacity(1).consume(…) 获得。
  1. Streams.range(1,1000000)
  2. .subscribe(new DefaultSubscriber<Long>(){
  3. Subscription sub;
  4. @Override
  5. void onSubscribe(Subscription sub){
  6. this.sub = sub;
  7. sub.request(1);
  8. }
  9. @Override
  10. void onNext(Long n){
  11. httpClient.get("localhost/"+n).onSuccess(rep -> sub.request(1));
  12. }
  13. );
  • 使用 DefaultSubscriber 以避免逐个实现订阅者的所有方法。
  • 持有订阅的指针后安排第一次需求请求。
  • 在成功的 GET 请求后,使用 异步 HTTP API 再次请求。延迟信息自然将被传递给 RangeStreamPublisher。你可以想到,通过计算两次请求的时间间隔,我们将能够深入的了解执行过程及 IO 操作所产生的延迟。
表 12,控制传递数据的信号量
Stream<T> API作用
subscribe(Subscriber<T>) 自定义 Subscriber<T> 的请求时机很灵活。如果订阅者使用阻塞操作,最好改变这些请求的大小。
capacity(long) 对当前 Stream<T>设定容量
onOverflowBuffer(CompletableQueue) 创建或使用给定的 CompletableQueue 储存溢出元素。当订阅者请求的数据量小于 Publisher 送出的数据量时,将产生溢出。溢出的数据将在下一次 request(long) 调用时被消耗。marble-overflowbuffer
onOverflowDrop() 忽略溢出的元素。当 Subscriber 请求的数据量小于 Publisher 送出的数据量时,将产生溢出。溢出的数据将在下一次 request(long) 调用时被消耗。marble-overflowdrop
throttle(long) 延迟下游的 request(long) ,并定期削减积累的对上游请求的需求。marble-throttle.png
requestWhen(Function<Stream<Long>, Publisher<Long>>) 将所有下游的 request(long) 传递给 Stream<Long> 的请求序列,此序列可以变更,并返回任意形式的Publisher<Long>RequestWhenAction 将订阅生产序列,并立即对上游的 request(long) 分配 onNext(Long)。它的行为类似于 adaptiveConsume,但可以插入到数据流管道的任意位置。
batchConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Long,Long>)batchConsumeOn 类似于 consume,但将使用默认的 Stream.capacity() 启动,请求映射的 Long 需求,并给出前一次的需求。对适配来源各异的需求非常有用。
adaptiveConsume(Consumer<T>, Consumer<T>, Consumer<T>, Function<Stream<Long>,Publisher<Long>>),adaptiveConsumeOn 类似于 batchConsume, 但将请求 Long 需求计算后的序列。它可以用于插入流程控制,例如延迟需求的 Stream.timer()AdaptiveConsumerAction 将订阅生产序列,将订阅生产序列,并立即对上游的 request(long) 分配 onNext(Long)
process(Processor<T, ?>) 任何处理器(Processor )都可以对需求或缓冲数据进行改造。对使用中的特定处理器实现进行检查是值得的。
All filter(arguments), take(arguments),takeWhile(arguments)… 所有的限定操作都可以用于主动限制 Stream 的信号量。
buffer(arguments), reduce(arguments),count(arguments)…​ 所有的聚合操作及度量操作都可以用于主动限制 Stream 的信号量。
All sample(arguments), sampleFirst(arguments) 通过对最后一个(或首个) onNext(T) 信号进行条件匹配,减少 Stream<T> 的信号量。这些条件可以被计数、计量、计数或计量,甚可以互动(事件驱动)。
zip(arguments), zipWith(arguments) 将 N Stream<T> 的信号量减少至压缩发布者生产的最后一次信号。每个发布者的聚合信号都可以用于从 N 最近的上游 onNext(T) 产生单独的值。