分析
度量操作和其它状态化操作一样,都是 Stream
API 的一部分。实际上,熟悉 Spark
的用户能够认出一些方法。ScanAction
也提供了一些常用的同 reduce()
和 scan()
相关的累积功能。
使用键/值型数据和度量操作
Broadcaster<Integer> source = Broadcaster.<Integer> create(Environment.get());
long avgTime = 50l;
Promise<Long> result = source
.throttle(avgTime) (1)
.elapsed() (2)
.nest() (3)
.flatMap(self ->
BiStreams.reduceByKey(self, (prev, next) -> prev + 1) (4)
)
.sort((a,b) -> a.t1.compareTo(b.t1)) (5)
.log("elapsed")
.reduce(-1L, (acc, next) ->
acc > 0l ? ((next.t1 + acc) / 2) : next.t1 (6)
)
.next(); (7)
for (int i = 0; i < 10; i++) {
source.onNext(1);
}
source.onComplete();
- 将传入的
订阅者(Publisher)
减速至每 50 毫秒一次,逐个等待数据发出。 - 在
onSubscribe
和 第一个信号之间,或是在两个信号之间产生一个拥有时间增量和有效载荷的Tuple2
。 - 使当前流可以接收
onNext
信号,以便我们将其同flatMap
组合。 - 累积所有数据,直到以
Tuple2.t1
和Tuple2.t2
为键值对的内部Map
发出onComplete()
信号。下一个匹配的主键将为累加器BiFunction
提供前一次的值和新发出的onNext
信号。这样我们就可以每个键增加一个有效载荷。 - 累积所有数据,直到内部
PriorityQueue
发出onComplete()
信号,并使用给定比较器对流逝的时间 t1 进行排序。在onComplete()
之后,所有的数据都会按顺序发出,然后就完成了。 - 累积所有数据,直到
onComplete()
信号的平均传送时间为默认的首次被接收的时间。 - 发出下一个信号,并且只计算平均值。
输出
03:14:42.013 [main] INFO elapsed - subscribe: ScanAction
03:14:42.021 [main] INFO elapsed - onSubscribe: {push}
03:14:42.022 [main] INFO elapsed - request: 9223372036854775807
03:14:42.517 [hash-wheel-timer-run-3] INFO elapsed - onNext: 44,1
03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 48,1
03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 49,2
03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 50,3
03:14:42.518 [hash-wheel-timer-run-3] INFO elapsed - onNext: 51,3
03:14:42.519 [hash-wheel-timer-run-3] INFO elapsed - complete: SortAction
03:14:42.520 [hash-wheel-timer-run-3] INFO elapsed - cancel: SortAction
表20,度量操作和其它状态化累积操作可用的操作
Stream<T> API 或工厂函数 | 输出值 | 作用 |
---|---|---|
count() | Long |
在观测到 onComplete() 信号后,产生观测到的 onNext(T) 信号的总量。对定时的窗体(Windows) 很有用,对限制大小的窗体(Windows) 意义不大。比如说 stream.window(5).flatMap(w -> w.count()) 的结果是 5,棒棒的。
|
scan(BiFunction<T,T>) | T | |
scan(A, BiFunction<A,T>) | A | |
reduce(BiFunction<T,T>) | T | |
reduce(A, BiFunction<A,T>) | A | |
BiStreams.reduceByKey() | ||
BiStreams.scanByKey() | ||
timestamp() | Tuple2<Long,T> | |
elapsed() | Tuple2<Long,T> | |
materialize() dematerialize() | Signal<T> |
将上游信号转换为 Signal<T> ,并将其视为 onNext(Signal<T>) 信号进行处理。直接效果就是:它将接收错误信号和完结信号,因此可以用来高效的处理错误。一旦有错误产生,我们可以将 dematerialize() 回调中的 Signal<T> 转化到 Reactive Streams,保证服务的运行。
|