分区

分区(Partition)是一种针对并行、并发作业的 Stream

以响应式编程的方式编写的功能组件,有一个重要的方面就是它的工作可以审慎的切块,交由任意调度器完成。这意味着你可以很轻松的将输入值组合冲一个工作流——在另一个线程执行操作,然后当结果可用时将其交给子序列,完成转化。这是 Reactor 很常见的使用模式。

  1. DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
  2. DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");
  3. Streams
  4. .range(1, 10)
  5. .groupBy(n -> n % 2 == 0) //1
  6. .flatMap(stream -> stream
  7. .dispatchOn(supplier1.get()) //2
  8. .log("groupBy")
  9. )
  10. .partition(5) //3
  11. .flatMap(stream -> stream
  12. .dispatchOn(supplier2.get()) //4
  13. .log("partition")
  14. )
  15. .dispatchOn(Environment.sharedDispatcher()) //5
  16. .log("join")
  17. .consume();
  • 创建至多两个(奇/偶)数据流,以键值 0 或 1 标记,并将 onNext(T) 信号分发给匹配的数据流。
  • 使用前面的 GroupByAction,为两个正在发送的 Stream 添加一个已经生成好的调度器。通过像这样使用分配于各自调度器的两个分区,数据流得到了有效的扩充。FlatMap 将合并两个分区的返回值,这个过程运行在两个线程之一,但绝不会并行处理。
  • 创建 5 个分区,并将 onNext(T) 信号以循环的方式分发给它们。
  • 使用第二个调度器分配新生成的数据流。返回的序列将被合并。
  • 使用 Environment.sharedDispatcher() 而不是前两个线程池分派数据。 五个线程将在 Dispatcher 线程合并。

提取输出

  1. 03:53:42.060 [groupByPool-3] INFO groupBy - onNext: 4
  2. 03:53:42.060 [partitionPool-8] INFO partition - onNext: 9
  3. 03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 6
  4. 03:53:42.061 [partitionPool-8] INFO partition - onNext: 4
  5. 03:53:42.061 [shared-1] INFO join - onNext: 9
  6. 03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 8
  7. 03:53:42.061 [partitionPool-4] INFO partition - onNext: 6
  8. 03:53:42.061 [shared-1] INFO join - onNext: 4
  9. 03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 10
  10. 03:53:42.061 [shared-1] INFO join - onNext: 6
  11. 03:53:42.061 [groupByPool-3] INFO groupBy - complete: DispatcherAction
Stream<T> API输出类型角色
groupBy(Function<T,K>) GroupedStream<K,T>
partition(int) GroupedStream<K,T>
所有的 window(arguments) Stream<T> 分区是依照时间、大小,或为了同外部信号协调而进行切分的,窗口其就是是为此而存在。
process(XXXWorkProcessor) T 因为 RingBufferWorkProcessor 可以将信号分发给每个订阅,如果只是为了扩充,而不是路由,它可以很好的替换 partition()