使用缓冲区
将数据 T
按照序列分组为列表 List<T>
的主要目的有二:
- 将匹配分界条件的序列暴露给一个 JVM API 常用的
Iterable
结构体。- 减少
onNext(T)
的信号量,类如buffer(5)
会将一个有10元素的序列转换成2个列表(每个列表有5个元素)。
收集数据将会产生内存甚或 CPU 的开销,应当适当的调整大小。建议使用小巧且定时的分界,以避免任何长时间的聚合。
如果一个
buffer()
被标记为定时的,却并未提供Timer
参数时,必须先为其初始化一个环境(Environment)
。
long timeout = 100;
final int batchsize = 4;
CountDownLatch latch = new CountDownLatch(1);
final Broadcaster<Integer> streamBatcher = Broadcaster.<Integer>create(env);
streamBatcher
.buffer(batchsize, timeout, TimeUnit.MILLISECONDS)
.consume(i -> latch.countDown());
streamBatcher.onNext(12);
streamBatcher.onNext(123);
Thread.sleep(200);
streamBatcher.onNext(42);
streamBatcher.onNext(666);
latch.await(2, TimeUnit.SECONDS);
表 10,使用Stream buffers进行块处理(返回Stream<List<T>>):
Stream<T> API | 作用 |
---|---|
buffer(int) |
聚合数据,直到 onComplete() 被调用;或是到达给定 int 参数的值,然后开始一个新的聚合。
|
buffer(Publisher<?>, Supplier<? extends Publisher<?>>) |
聚合数据,直到 onComplete() 被调用,或是第一个 Publisher<?> 参数发出信号。可选参数 Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>> 。
|
buffer(Supplier<? extends Publisher<?>>) |
聚合数据,直到 onComplete() 被调用;或是与提供的 Publisher<?> 协调。Supplier<? extends Publisher<?>> 提供了一个序列,它的第一个信号将终止其所链接的聚合并立即开始一个新的聚合。
|
buffer(int, int) |
聚合数据,直到 onComplete() 被调用;或是到达给定的忽略值(第二个参数),然后开始一个新的聚合。第一个尺寸参数int 将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>> 。
|
buffer(long, TimeUnit, Timer) |
聚合数据,直到 onComplete() 被调用;或是到达等待时长(第一个长整型参数),然后开始一个新的聚合。
|
buffer(long, long, TimeUnit, Timer) |
聚合数据,直到 onComplete() 被调用;或是到达给定的时移(第二个长整型参数),然后开始一个新的聚合。时间跨度(第一个长整型参数)将界定缓冲区聚合元素的最大数量。这意味着重叠的(位移的缓冲区)或脱节的聚合数据可以发送给子订阅者 Subscriber<List<T>> 。
|
buffer(int, long, TimeUnit, Timer) |
buffer(int) 或 buffer(long, TimeUnit, Timer) 条件的组合。数据聚合的过程在到达给定大小或时间跨度耗尽时完成。
|