使用窗口
将数据 T
按照序列分发给 Stream<T>
的主要目的有三:
将数据
T
的一个序列暴露给一系列有限且分组的观察和统计:取和计算、平均值计算或灵活的聚合(Map
、Tuple
……)。
将分组数列同dispatchOn
结合,并为生成的每个Stream<T>
进行并行化处理。
对每个独立的分组序列重复onComplete()
调用,例如,在异步 IO 模块中界定一次数据冲洗。
如果是同聚合所有的
Stream.buffer()
方法相结合,Stream<T>
窗口等效于聚合生产者,较之缓冲 API稍欠优化:
stream.buffer(10, 1, TimeUnit.SECONDS);
//equivalent to
stream.window(10, 1, TimeUnit.SECONDS).flatMap( window -> window.buffer() )
如果一个
window()
被标记为定时的,却并未提供Timer
参数时,必须先为其初始化一个环境(Environment)
。
//create a list of 1000 numbers and prepare a Stream to read it
Stream<Integer> sensorDataStream = Streams.from(createTestDataset(1000));
//wait for all windows of 100 to finish
CountDownLatch endLatch = new CountDownLatch(1000 / 100);
Control controls = sensorDataStream
.window(100)
.consume(window -> {
System.out.println("New window starting");
window
.reduce(Integer.MAX_VALUE, (acc, next) -> Math.min(acc, next))
.finallyDo(o -> endLatch.countDown())
.consume(i -> System.out.println("Minimum " + i));
});
endLatch.await(10, TimeUnit.SECONDS);
System.out.println(controls.debug());
Assert.assertEquals(0, endLatch.getCount());
表 11,使用Stream 进行块处理(返回Stream<Stream<T>>):
Stream<T> API | 作用 |
---|---|
window(int) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是到达给定 int 参数的值,然后开始一个新的聚合。
|
window(Publisher<?>, Supplier<? extends Publisher<?>>) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是第一个 Publisher<?> 参数发出信号。可选参数 Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream<T>> 。
|
window(Supplier<? extends Publisher<?>>) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是与提供的 Publisher<?> 协调。Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的 Stream<T> 并立即开始一个新 Stream<T> 。
|
window(int, int) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是到达给定的忽略值(第二个参数),然后开始一个新的 Stream<T> 。第一个尺寸参数int 将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream<T>> 。
|
window(long, TimeUnit, Timer) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是到达等待时长(长整型参数),然后开始一个新的 Stream<T> 。
|
window(long, long, TimeUnit, Timer) |
分发数据到一个生成的 Stream<T> ,直到 onComplete() 被调用;或是到达给定的时移 (第二个长整型参数),然后开始一个新的聚合。时间跨度(第一个长整型参数)将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<Stream <T>> 。
|
window(int, long, TimeUnit, Timer) |
buffer(int) 和 buffer(long, TimeUnit, Timer) 条件的组合。对生成的 Stream<T> 分发数据的过程在到达给定大小或时间跨度 耗尽时完成。
|