制作一个快速断路器
在本节练习中,我们将更关注于如何更高效的利用你手中的 Reactor Stream 模块。下面是一个典型用例:使用断路器模式创建一个自愈型数据管道(也许很快就能够在 Stream API中实现,也许哦~)。本用例中,我们希望在哪怕出错的时候,Stream
依然可以存活。当错误积累到一定程度,我们将希望能够关闭来自主断路器(实际上是 Stream
)的消费。在一个很短的时间段中,我们将断开断路器,启用一个备用发布者 Stream
。任何类型的发布者都可以作为备份使用,它只是用来发布替代信息。关键是在一段时间内阻止对出错 Stream
的新访问,给予它恢复的机会。
快速(简单)断路器测试
final Broadcaster<String> closeCircuit = Broadcaster.create();
final Stream<String> openCircuit = Streams.just("Alternative Message");
final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext();
final AtomicInteger successes = new AtomicInteger();
final AtomicInteger failures = new AtomicInteger();
final int maxErrors = 3;
Promise<List<String>> promise =
circuitSwitcher
.observe(d -> successes.incrementAndGet())
.when(Throwable.class, error -> failures.incrementAndGet())
.observeStart(s -> {
System.out.println("failures: " + failures +
" successes:" + successes);
if (failures.compareAndSet(maxErrors, 0)) {
circuitSwitcher.onNext(openCircuit);
successes.set(0);
Streams
.timer(1)
.consume(ignore -> circuitSwitcher.onNext(closeCircuit));
}
})
.retry()
.toList();
circuitSwitcher.onNext(closeCircuit);
closeCircuit.onNext("test1");
closeCircuit.onNext("test2");
closeCircuit.onNext("test3");
closeCircuit.onError(new Exception("test4"));
closeCircuit.onError(new Exception("test5"));
closeCircuit.onError(new Exception("test6"));
Thread.sleep(1500);
closeCircuit.onNext("test7");
closeCircuit.onNext("test8");
closeCircuit.onComplete();
circuitSwitcher.onComplete();
System.out.println(promise.await());
Assert.assertEquals(promise.get().get(0), "test1");
Assert.assertEquals(promise.get().get(1), "test2");
Assert.assertEquals(promise.get().get(2), "test3");
Assert.assertEquals(promise.get().get(3), "Alternative Message");
Assert.assertEquals(promise.get().get(4), "test7");
Assert.assertEquals(promise.get().get(5), "test8");
- 创建一个主要的活跃
Broadcaster
,用来发送数据。 - 创建一个简单的回退流,以防意外。
- 创建一个
SwitchAction
,以便Processor
接受新的数据的Publisher
。 - 准备共享的成功失败计数器。
Stream.toList()
返回一个Promise
对象,将数据流转换为实际意义的List
列表- circuitSwitcher
Processor
代理中的数据消费将视失败数量而定。 - 对每一次成功的 onNext(String) 调用和每一次
可抛出(Throwable)
异常产生分别计数。 - 监视
onSubscribe(Subscription)
的调用,它在每次成功启动数据流后将被调用。 - 如果错误数达到了 maxErrors,将 circuitSwitcher 对象当前的数据源切换到备用的数据源,这也将触发断路器。
- 一秒钟后,将主数据流传递给 circuitSwitcher 对象,恢复其对主数据流的数据消费。
- 在任何异常(即取消和重订阅)后都尝试重新运行。这也是我们使用
observeStart()
的原因——任何错误都将触发它。 - 使用主
Stream
启动 circuitSwitcher 对象。 - 人为延迟,留出断路器关闭的时间。
- 同时在主数据流和 circuitSwitcher 对象上调用
onComplete()
(否则它们会因为onComplete()
信号的缺失而在后台挂起。