制作一个快速断路器

在本节练习中,我们将更关注于如何更高效的利用你手中的 Reactor Stream 模块。下面是一个典型用例:使用断路器模式创建一个自愈型数据管道(也许很快就能够在 Stream API中实现,也许哦~)。本用例中,我们希望在哪怕出错的时候,Stream 依然可以存活。当错误积累到一定程度,我们将希望能够关闭来自主断路器(实际上是 Stream)的消费。在一个很短的时间段中,我们将断开断路器,启用一个备用发布者 Stream。任何类型的发布者都可以作为备份使用,它只是用来发布替代信息。关键是在一段时间内阻止对出错 Stream 的新访问,给予它恢复的机会。

快速(简单)断路器测试

  1. final Broadcaster<String> closeCircuit = Broadcaster.create();
  2. final Stream<String> openCircuit = Streams.just("Alternative Message");
  3. final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext();
  4. final AtomicInteger successes = new AtomicInteger();
  5. final AtomicInteger failures = new AtomicInteger();
  6. final int maxErrors = 3;
  7. Promise<List<String>> promise =
  8. circuitSwitcher
  9. .observe(d -> successes.incrementAndGet())
  10. .when(Throwable.class, error -> failures.incrementAndGet())
  11. .observeStart(s -> {
  12. System.out.println("failures: " + failures +
  13. " successes:" + successes);
  14. if (failures.compareAndSet(maxErrors, 0)) {
  15. circuitSwitcher.onNext(openCircuit);
  16. successes.set(0);
  17. Streams
  18. .timer(1)
  19. .consume(ignore -> circuitSwitcher.onNext(closeCircuit));
  20. }
  21. })
  22. .retry()
  23. .toList();
  24. circuitSwitcher.onNext(closeCircuit);
  25. closeCircuit.onNext("test1");
  26. closeCircuit.onNext("test2");
  27. closeCircuit.onNext("test3");
  28. closeCircuit.onError(new Exception("test4"));
  29. closeCircuit.onError(new Exception("test5"));
  30. closeCircuit.onError(new Exception("test6"));
  31. Thread.sleep(1500);
  32. closeCircuit.onNext("test7");
  33. closeCircuit.onNext("test8");
  34. closeCircuit.onComplete();
  35. circuitSwitcher.onComplete();
  36. System.out.println(promise.await());
  37. Assert.assertEquals(promise.get().get(0), "test1");
  38. Assert.assertEquals(promise.get().get(1), "test2");
  39. Assert.assertEquals(promise.get().get(2), "test3");
  40. Assert.assertEquals(promise.get().get(3), "Alternative Message");
  41. Assert.assertEquals(promise.get().get(4), "test7");
  42. Assert.assertEquals(promise.get().get(5), "test8");
  • 创建一个主要的活跃 Broadcaster,用来发送数据。
  • 创建一个简单的回退流,以防意外。
  • 创建一个 SwitchAction,以便 Processor 接受新的数据的 Publisher
  • 准备共享的成功失败计数器。
  • Stream.toList() 返回一个 Promise 对象,将数据流转换为实际意义的 List 列表
  • circuitSwitcher Processor 代理中的数据消费将视失败数量而定。
  • 对每一次成功的 onNext(String) 调用和每一次可抛出(Throwable)异常产生分别计数。
  • 监视 onSubscribe(Subscription) 的调用,它在每次成功启动数据流后将被调用。
  • 如果错误数达到了 maxErrors,将 circuitSwitcher 对象当前的数据源切换到备用的数据源,这也将触发断路器。
  • 一秒钟后,将主数据流传递给 circuitSwitcher 对象,恢复其对主数据流的数据消费。
  • 在任何异常(即取消和重订阅)后都尝试重新运行。这也是我们使用 observeStart() 的原因——任何错误都将触发它。
  • 使用主 Stream 启动 circuitSwitcher 对象。
  • 人为延迟,留出断路器关闭的时间。
  • 同时在主数据流和 circuitSwitcher 对象上调用 onComplete()(否则它们会因为 onComplete() 信号的缺失而在后台挂起。