reactor-核心

永远别独自展开异步工作。
— Jon Brisbin
在写 Reactor 1 之后

永远别独自展开异步工作。
— Stephane Maldini
在写 Reactor 2 之后

先来看看,某项目是如何使用 Groovy 的:

  1. // 初始化上下文,获取默认调度者
  2. Environment.initialize()
  3. // RingBufferDispatcher,默认带 8192 槽容量
  4. def dispatcher = Environment.sharedDispatcher()
  5. // 创建回调
  6. Consumer<Integer> c = { data ->
  7. println "some data arrived: $data"
  8. }
  9. // 创建 error 回调
  10. Consumer<Throwable errorHandler = { it.printStackTrace }
  11. // 异步分发数据
  12. dispatcher.dispatch(1234, c, errorHandler)
  13. Environment.terminate()

然后,再看看响应式数据流例子

  1. // 独立异步处理者
  2. def processor = RingBufferProcessor.<Integer>create()
  3. // 发送数据,确保数据的安全性,直到订阅成功
  4. processor.onNext(1234)
  5. processor.onNext(5678)
  6. // 消费整型数据
  7. processor.subscribe(new Subscriber<Integer>(){
  8. void onSubscribe(Subscription s){
  9. //unbounded subscriber
  10. s.request Long.MAX
  11. }
  12. void onNext(Integer data){
  13. println data
  14. }
  15. void onError(Throwable err){
  16. err.printStackTrace()
  17. }
  18. void onComplete(){
  19. println 'done!'
  20. }
  21. }
  22. // 完全关闭内部线程和调用
  23. processor.onComplete()