设定容量

Reactive Streams 标准鼓励应用程序开发者在处理中的数据上设定合理的限制。这会防止组件变得太臃肿,拥有超过它们处理能力的数据量, 这样会导致整个应用程序发生不可预测的问题。Reactive Streams 的一个核心理念之一就是 "背压", 或者说是管道同一次只能处理固定数量数据项的上行流组件通信的能力。 一个用来描述使用队列来请求小批量大容量数据的有用的技术项目就是 "微批量"。

在一个 Reactor 的 Stream 中, 将数据项微批量化来限制要处理的数据的数量是可能的。这在许多方法中拥有不同的好处,并不仅仅是如果系统要崩溃了,它就能通过阻止系统接收更多它负担不起的数据,来限制数据丢失的发生。

为了要限制 Stream 中要处理的数据量,就使用 .capacity(long) 方法。

Streams.just()

  1. Stream<String> st;
  2. st
  3. .dispatchOn(sharedDispatcher())
  4. .capacity(256) //1
  5. .consume(s -> service.doWork(s)); //2
  • 限制要处理的数据数量到每次不超过 256 个元素。
  • 在请求接下来的 256 个数据元素时,产生上行流需求。

如果当前用 dispatchOn 设置的 Stream 调度器时一个 SynchronousDispatcher.INSTANCE (如果没有设置,这就是默认的) ,capacity 就不会影响到 consume

我们留下这里作为 Reactor 使用者的练习,用来了解设置容量同使用 Stream.adaptiveConsume 或者一个定制的 Subscriber 计算动态需求的对比。