请求应答模式
EventBus
发布和响应事件使用的是请求应答模式。
常见的情景是,你希望能够从运行在 EventBus 配置好的调度器(Dispatcher)
中的任务里获取应答。Reactor 的 EventBus
提供了比简单的发布订阅模型更全面的事件处理模型。除了 Cunsumer
,你也可以同样注册一个函数,EventBus
会自动将 Function
的返回值推送给 replyTo
主键中的主题。在这里,推荐使用 .receive()
和 .send()
方法,而不是 .on()
和 .notify()
方法。
请求应答
EventBus bus;
bus.on($("reply.sink"), ev -> {
System.out.printf("Got %s on thread %s%n", ev, Thread.currentThread())
});
bus.receive($("job.sink"), ev -> {
return doWork(ev);
});
bus.send("job.sink", Event.wrap("Hello World!", "reply.sink"));
- 分配一个处理所有应答的
consumer
,不进行任何分析。 - 分配一个工作在
Dispatcher
线程的Function
,完成工作并返回结果。3.使用给定的replyTo
主键在总线中发布Event
。 如果没有一个发布应答的通用主题,你可以将请求和应答的操作绑定到一个单独的对.sendAndReceive(Object, Event<?>, Consumer<Event<?>>)
方法的调用中。此方法将调用.send()
,并在函数被调用时在Dispatcher
线程调用给定的 replyTo 回调函数。
sendAndReceive()
EventBus bus;
bus.receive($("job.sink"), (Event<String> ev) -> {
return ev.getData().toUpperCase();
});
bus.sendAndReceive(
"job.sink",
Event.wrap("Hello World!"),
s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread())
);
- 分配一个在
Dispatcher
线程完成工作并返回结果的Function
。 - 在总线中发布一个
Event
,并在Dispatcher
中安排给定的 replyToConsumer
,将接收事件的函数的返回值作为输入传递给它。
取消任务
有时候你希望取消一个任务,停止响应事件通知。注册函数 .on()
和 .receive()
将返回一个 Registration
对象,如果持有该对象的引用,你可以用它取消给定 Selector
的 Consumer
或 Function
。
EventBus bus;
Registration reg = bus.on($("topic"),
s -> System.out.printf("Got %s on thread %s%n", s, Thread.currentThread()));
bus.notify("topic", Event.wrap("Hello World!"));
// ...some time later...
reg.cancel();
// ...some time later...
bus.notify("topic", Event.wrap("Hello World!"));
- 对给定主题发布一个事件,应当在控制台中打印
Event.toString()
。 - 取消
Registration
对象的注册,组织消息抵达Consumer
。 - 这个通知不应当有任何结果。
牢记,取消一个
Registration
的注册将对内部注册表进行原子访问。当系统中存在大量流向消费者的时间时,有时在你的.cancel()
调用完成后注册表(Registry)
清理缓存并移除Registration
前,你的Consumer
或Function
依然会接收到一些事件。.cancel()
方法可以被称为:"请求尽快的取消"。
在测试类中你能够察觉这一行为特征,测试类中在.on()
、.notify()
和.cancel()
的调用之间没有任何时间延迟。