请求应答模式

EventBus 发布和响应事件使用的是请求应答模式。

常见的情景是,你希望能够从运行在 EventBus 配置好的调度器(Dispatcher)中的任务里获取应答。Reactor 的 EventBus 提供了比简单的发布订阅模型更全面的事件处理模型。除了 Cunsumer,你也可以同样注册一个函数,EventBus 会自动将 Function 的返回值推送给 replyTo 主键中的主题。在这里,推荐使用 .receive().send() 方法,而不是 .on().notify() 方法。

请求应答

  1. EventBus bus;
  2. bus.on($("reply.sink"), ev -> {
  3. System.out.printf("Got %s on thread %s%n", ev, Thread.currentThread())
  4. });
  5. bus.receive($("job.sink"), ev -> {
  6. return doWork(ev);
  7. });
  8. bus.send("job.sink", Event.wrap("Hello World!", "reply.sink"));
  • 分配一个处理所有应答的 consumer,不进行任何分析。
  • 分配一个工作在 Dispatcher 线程的 Function,完成工作并返回结果。3.使用给定的 replyTo 主键在总线中发布 Event。 如果没有一个发布应答的通用主题,你可以将请求和应答的操作绑定到一个单独的对 .sendAndReceive(Object, Event<?>, Consumer<Event<?>>) 方法的调用中。此方法将调用 .send() ,并在函数被调用时在 Dispatcher 线程调用给定的 replyTo 回调函数。

sendAndReceive()

  1. EventBus bus;
  2. bus.receive($("job.sink"), (Event<String> ev) -> {
  3. return ev.getData().toUpperCase();
  4. });
  5. bus.sendAndReceive(
  6. "job.sink",
  7. Event.wrap("Hello World!"),
  8. s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread())
  9. );
  • 分配一个在 Dispatcher 线程完成工作并返回结果的 Function
  • 在总线中发布一个 Event,并在 Dispatcher中安排给定的 replyTo Consumer,将接收事件的函数的返回值作为输入传递给它。

取消任务

有时候你希望取消一个任务,停止响应事件通知。注册函数 .on().receive() 将返回一个 Registration 对象,如果持有该对象的引用,你可以用它取消给定 SelectorConsumerFunction

  1. EventBus bus;
  2. Registration reg = bus.on($("topic"),
  3. s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread()));
  4. bus.notify("topic", Event.wrap("Hello World!"));
  5. // ...some time later...
  6. reg.cancel();
  7. // ...some time later...
  8. bus.notify("topic", Event.wrap("Hello World!"));
  • 对给定主题发布一个事件,应当在控制台中打印 Event.toString()
  • 取消 Registration 对象的注册,组织消息抵达 Consumer
  • 这个通知不应当有任何结果。

牢记,取消一个 Registration 的注册将对内部注册表进行原子访问。当系统中存在大量流向消费者的时间时,有时在你的 .cancel() 调用完成后 注册表(Registry) 清理缓存并移除 Registration 前,你的 ConsumerFunction依然会接收到一些事件。.cancel() 方法可以被称为:"请求尽快的取消"。
在测试类中你能够察觉这一行为特征,测试类中在 .on().notify().cancel() 的调用之间没有任何时间延迟。