从自定义 Reactive Publisher 处创建

随着时间的推移,Reactor的用户会愈加熟悉 Reactive Streams。 那就会是开始创建自定义响应式数据源的完美时分! 一般实现者会遵循规范的要求并使用 reactive-streams-tck 依赖来验证它的成果。对于约定的遵行需要一个 Subscription 还有在发送任何数据之前对于 onSubscribe 加一个 request(long) 的一次调用。不过 Reactor 允许在只处理消息传递部分有一些灵活性,并将会明确的提供自动缓存的 Subscription,不同之处在下面的 代码示例中有所展示。

Streams.create 和 Streams.defer 实战

  1. final Stream<String> stream1 = Streams.create(new Publisher<String>() {
  2. @Override
  3. public void subscribe(Subscriber<? super String> sub) {
  4. sub.onSubscribe(new Subscription() { //1
  5. @Override
  6. public void request(long demand) {
  7. if(demand == 2L){
  8. sub.onNext("1");
  9. sub.onNext("2");
  10. sub.onComplete();
  11. }
  12. }
  13. @Override
  14. public void cancel() {
  15. System.out.println("Cancelled!");
  16. }
  17. });
  18. }
  19. });
  20. final Stream<String> stream2 = Streams.create(sub -> {
  21. sub.onNext("3"); //2
  22. sub.onNext("4");
  23. sub.onComplete();
  24. });
  25. final AtomicInteger counterSubscriber = new AtomicInteger();
  26. Stream<String> deferred = Streams.defer(() -> {
  27. if (counterSubscriber.incrementAndGet() == 1) { //3
  28. return stream1;
  29. }
  30. else {
  31. return stream2;
  32. }
  33. });
  34. deferred
  35. .consume(s -> System.out.printf("%s First subscription = %s%n", Thread.currentThread(), s));
  36. deferred
  37. .consume(s -> System.out.printf("%s Second subscription = %s%n", Thread.currentThread(), s));
  • 从一个可用的首先会调用 onSubscribe(Subscription) 的自定义 Publisher 那里创建一个 Stream
  • 从格式难看的跳过了 onSubscribe(Subscription) 并立即调用了 onNext(T) 的自定义 Publisher 那里创建一个 Stream
  • 创建一个 DeferredStream,它将会在每一次 Stream.subscribe 调用上替换原来的 Publisher,计算 Subscriber 的总数。 从这儿到哪儿去呢? 有大量的用例可以从自定义的 Publisher 那里获益:
  • 响应式外观可以将任何IO调用转换成一个配对的需求和组合: HTTP 调用(读取N次), SQL 查询(选择最大的 N),文件读取(读取 N 行)……
  • 异步外观可以将任何热数据调用转换成一个组合式的 API: AMQP Consumer, Spring MessageChannel 端点……

Reactor 提供了一些可重用的组件来避免你不得不去做的样板检查,无需扩展现有的 Stream 或者 PushSubscription

  • 扩展 PushSubscription 而不是直接实现 Subscription ,从而来获得终端状态 (PushSubscription.isComplete()) 的好处。
  • 使用 PublisherFactory.create(args) 或者 Streams.createWith(args) 来为每个生命周期的 step (requested,stopped,started) 使用函数式消耗者。
  • 扩展 Stream 而不是直接实现 Publisher 来获得组合式 API 的好处。

Streams.createWith, 一个去掉了一些样板的 create() 的替代

  1. final Stream<String> stream = Streams.createWith(
  2. (demand, sub) -> { //1
  3. sub.context(); //2
  4. if (demand >= 2L && !sub.isCancelled()) {
  5. sub.onNext("1");
  6. sub.onNext("2");
  7. sub.onComplete();
  8. }
  9. },
  10. sub -> 0, //3
  11. sub -> System.out.println("Cancelled!") //4
  12. );
  13. stream.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s));
  • Subscriber 的请求之上附上一个请求消耗者响应,并传入要求和请求的订阅者。
  • sub 参数实际上是一个可能带上由所有请求回调共享的 初始状态的 SubscriberWithContext
  • 启动时候执行一次,这里也是我们初始化可选的上下文的地方;每次请求回调都会从 context() 上接收到一个 0。
  • 在任何终端事件上执行一次:cancel()onComplete() 或者 onError(e)。 开始去为响应式数据流方式编写代码的一个好地方就是简单的去看看更加详细,背压可用的文件流