reactor-stream
注意,你应该再也别去使用 Future.get() 了。 — Stephane Maldini 与一个银行业的客户
首先来看看一个 Java 8 示例中流 (Stream) 的运作方式
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.BiStreams;
//...
Environment.initialize()
//找到一个 String 列表中开头的 10 个词
Streams.from(aListOfString)
.dispatchOn(sharedDispatcher())
.flatMap(sentence ->
Streams
.from(sentence.split(" "))
.dispatchOn(cachedDispatcher())
.filter(word -> !word.trim().isEmpty())
.observe(word -> doSomething(word))
)
.map(word -> Tuple.of(word, 1))
.window(1, TimeUnit.SECONDS)
.flatMap(words ->
BiStreams.reduceByKey(words, (prev, next) -> prev + next)
.sort((wordWithCountA, wordWithCountB) -> -wordWithCountA.t2.compareTo(wordWithCountB.t2))
.take(10)
.finallyDo(event -> LOG.info("---- window complete! ----"))
)
.consume(
wordWithCount -> LOG.info(wordWithCount.t1 + ": " + wordWithCount.t2),
error -> LOG.error("", error)
);