响应式数据流

响应式数据流 作为一种新的数据流规范应用于 Java 9 及其后续版本,并被多个供应商和技术企业采纳,包括包括 Netflix,Oracle,Pivotal 或 Typesafe。

这一规范的定位非常清晰,旨在提供同/异步数据序列流式控制机制,并在 JVM 上首先推广。该规范由 4 个 Java 接口,1 个 TCK 和一些样例组成。在实现所需的 4 个接口之外,该规范的实质在于经由 TCK 验证的行为状态。能成功通过 TCK 实现类检测意味着满足 Reactive Streams Ready 状态。

Figure 3. The Reactive Streams Contract图3. 响应式数据流约定,

响应式数据流接口

  • org.reactivestreams.Pubslisher:数据流发布者(信号从 0 到 N,N 可为无穷)。提供两个可选终端事件:错误和完成。
  • org.reactivestreams.Subscriber:数据流消费者(信号从 0 到 N,N 可为无穷)。消费者初始化过程中,会请求生产者当前需要订阅多少数据。其他情况,通过接口回调与数据生产方交互: 下一条(新消息)和状态。状态包括:完成/错误,可选。
  • org.reactivestreams.Subscription:初始化阶段将一个小追踪器传递给订阅者。它控制着我们准备好来消费多少数据,以及我们想要什么时候停止消费(取消)。
  • org.reactivestreams.Processor:同时作为发布者和订阅者的组件的标记。

Figure 4. The Reactive Streams publishing protocol图4. 响应式数据流发布协议

订阅者有两种方式向发布者请求数据,如下所示:

  • 无界的:订阅者只需要调用 Subscription#request(Long.MAX_VALUE) 即可。
  • 有界的:订阅者保留数据引用,调用request(long) 方法消费。
    • 通常订阅者在订阅时会请求一个初始数据集或者一个数据
    • 在 onNext 成功后(如 Commit,Flush 等…​ 之后),请求更多数据
    • 建议请求数量呈线性,尽量避免请求叠加, 如每下一个信号请求 10 个数据

表1. 迄今为止,Reactor 可直接使用的接口有:

响应式数据流Reactor 模块实现描述
Processor reactor-core, reactor-stream reactor.core.processor., reactor.rx. Reactor-core:提供背压就绪的RingBuffer*Processor 等多种Processorreactor-streeam:提供全套的操作语句和广播机制
Publisher reactor-core, reactor-bus, reactor-stream, reactor-net reactor.core.processor., reactor.rx.stream., reactor.rx.action., reactor.io.net. Reactor-core:Processors里植入PublishedReactor-Bus: 提供无界的路由事件的发射发布机制Reactor-stream:Publisher中直接引入数据流扩展Reactor-net:提供2中发布方式:一种是Channel,另一种是flush和闭环回调
Subscriber reactor-core, reactor-bus, reactor-stream, reactor-net reactor.core.processor., reactor.bus.EventBus., reactor.rx.action., reactor.io.net.impl. Reactor-core:Processors植入 SubscriberReactor-bus:通过无界Publisher/Subcriber机制扩展总线能力Reactor-stream:订阅行为通过计算指定回调实现Reactor-net: IO 层实现处理写入,关闭和 flush
Subscription reactor-stream, reactor-net reactor.rx.subscription., reactor.io.net.impl. Reactor-Stream:提供了最优化过 PushSubscriptions 和 缓冲就绪的 ReactiveSubscription;Reactor-net:通过定制Subscription 异步 IO 用户端实现背压

我们从 Reactor 2 开始就严格遵守这个规范直到 1.0.0 发布,在 Maven Central 和其他镜像上都可使用。当然它也是 reactor-core 的传递依赖