Rx之外的其它 API

除了 Reactive Stream 直接实现的方法之外,还有一些 Stream 方法并没有被涉及,或是没有录入 Reactive 扩展文档之中。

表22,在前面的用例中未涉及的一些方法
Stream<T> API输入类型输出类型角色
after() T Void 只消费 onComplete()onError() 信号
log(String) T T 使用 SLF4J 和 给定类别记录每个信号。
split Iterable<T> T Iterable<T> 阻塞转化成尽可能多的 onNext(T)
sort(int, Comparator<T>) T T 将给定尺寸的数据存入存于内存的 PriorityQueue中,使用 Comparator<T>进行排序,并将所有挂起的 onNext(T) 信号发出。
combine() T O 自右向左扫描最上层的对象或 Action。它将创建一个新的处理器(Processor)作为返回结果,处理器使用分发给旧 action 的 onXXXX 信号作为输入值,并出处代理当前 action 的订阅者(Subscriber)。示例
  1. Action<Integer, String> processor = stream
  2. .filter( i -> i<2 )
  3. .map(Object::toString)
  4. .combine();
  5. processor.consume(System.out::println);
  6. processor.onNext(1);
  7. processor.onNext(3);
keepAlive() T T 阻止来自于订阅者(Subscriber)Subscription.cancel() 信号的传播。