使用窗口

将数据 T 按照序列分发给 Stream<T> 的主要目的有三:

将数据 T 的一个序列暴露给一系列有限且分组的观察和统计:取和计算、平均值计算或灵活的聚合(MapTuple……)。
将分组数列同 dispatchOn 结合,并为生成的每个 Stream<T> 进行并行化处理。
对每个独立的分组序列重复 onComplete() 调用,例如,在异步 IO 模块中界定一次数据冲洗。

如果是同聚合所有的 Stream.buffer() 方法相结合,Stream<T> 窗口等效于聚合生产者,较之缓冲 API稍欠优化:

  1. stream.buffer(10, 1, TimeUnit.SECONDS);
  2. //equivalent to
  3. stream.window(10, 1, TimeUnit.SECONDS).flatMap( window -> window.buffer() )

如果一个 window() 被标记为定时的,却并未提供 Timer 参数时,必须先为其初始化一个 环境(Environment)

  1. //create a list of 1000 numbers and prepare a Stream to read it
  2. Stream<Integer> sensorDataStream = Streams.from(createTestDataset(1000));
  3. //wait for all windows of 100 to finish
  4. CountDownLatch endLatch = new CountDownLatch(1000 / 100);
  5. Control controls = sensorDataStream
  6. .window(100)
  7. .consume(window -> {
  8. System.out.println("New window starting");
  9. window
  10. .reduce(Integer.MAX_VALUE, (acc, next) -> Math.min(acc, next))
  11. .finallyDo(o -> endLatch.countDown())
  12. .consume(i -> System.out.println("Minimum " + i));
  13. });
  14. endLatch.await(10, TimeUnit.SECONDS);
  15. System.out.println(controls.debug());
  16. Assert.assertEquals(0, endLatch.getCount());
表 11,使用Stream 进行块处理(返回Stream<Stream<T>>):
Stream<T> API作用
window(int) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是到达给定 int 参数的值,然后开始一个新的聚合。
window(Publisher<?>, Supplier<? extends Publisher<?>>) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是第一个 Publisher<?> 参数发出信号。可选参数 Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream<T>>
window(Supplier<? extends Publisher<?>>) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是与提供的 Publisher<?> 协调。Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的 Stream<T> 并立即开始一个新 Stream<T>
window(int, int) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是到达给定的忽略值(第二个参数),然后开始一个新的 Stream<T>。第一个尺寸参数int 将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream<T>>
window(long, TimeUnit, Timer) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是到达等待时长(长整型参数),然后开始一个新的 Stream<T>
window(long, long, TimeUnit, Timer) 分发数据到一个生成的 Stream<T>,直到 onComplete() 被调用;或是到达给定的时移(第二个长整型参数),然后开始一个新的聚合。时间跨度(第一个长整型参数)将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream <T>>
window(int, long, TimeUnit, Timer) buffer(int)buffer(long, TimeUnit, Timer) 条件的组合。对生成的 Stream<T> 分发数据的过程在到达给定大小时间跨度耗尽时完成。