制作一个简单的文件流

让我们从实现一个简单的 publisher 开始,我们将使用 Reactor API 来简化后面的示例代码。作为一个 Publisher 你不得不顾及很多细节,因为这些细节将在 Reactive Streams 中的 TCK 模块中被检测。这样做的目的是让你能够更好的理解 Reactor 在特定条件下能够完成的工作,从而避免重造轮子。

理论上说,在单线程、简单循环阻塞的文件读取消费中,Reactor Streams 并不能为你提供什么帮助。如果接收端点阻塞,它将发送多少读取多少,这已经是某种形式上的背压。Reactor 文件流 的优势在于,当流与消费者之间有一个或多个边界需要跨越时,可以通过采用队列或环形缓冲器来解耦合。你不妨想象一下:当你在向一个消费者发送数据时,同时进行数据的读取,那当它下次请求数据的时候(在前一次数据发送完成后),被请求的数据已经保存在内存中了。换句话说,就是预读取。

创建一个匹配订阅者请求的文件惰性读取发布者

  1. Publisher<String> fileStream = new Publisher<String>() {
  2. @Override
  3. public void subscribe(final Subscriber<? super String> subscriber) {
  4. final File file = new File("settings.gradle");
  5. try {
  6. final BufferedReader is = new BufferedReader(new FileReader(file));
  7. subscriber.onSubscribe(new Subscription() {
  8. final AtomicBoolean terminated = new AtomicBoolean(false);
  9. @Override
  10. public void request(long n) {
  11. long requestCursor = 0l;
  12. try {
  13. String line;
  14. while ((requestCursor++ < n || n == Long.MAX_VALUE)
  15. && !terminated.get()) {
  16. line = is.readLine();
  17. if (line != null) {
  18. subscriber.onNext(line);
  19. } else {
  20. if (terminate()) {
  21. subscriber.onComplete();
  22. }
  23. return;
  24. }
  25. }
  26. } catch (IOException e) {
  27. if (terminate()) {
  28. subscriber.onError(e);
  29. }
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. terminate();
  35. }
  36. private boolean terminate() {
  37. if (terminated.compareAndSet(false, true)) {
  38. try {
  39. is.close();
  40. } catch (Exception t) {
  41. subscriber.onError(t);
  42. }
  43. return true;
  44. }
  45. return false;
  46. }
  47. });
  48. } catch (FileNotFoundException e) {
  49. Streams.<String, FileNotFoundException> fail(e)
  50. .subscribe(subscriber);
  51. }
  52. }
  53. };
  54. Streams.wrap(fileStream)
  55. .capacity(4L)
  56. .consumeOn(
  57. Environment.sharedDispatcher(),
  58. System.out::println,
  59. Throwable::printStackTrace,
  60. nothing -> System.out.println("## EOF ##")
  61. );
  • 实现一个 Publisher。下一个列子中你将看到依靠核心和数据流,发布者能够多么灵巧。
  • 创建供一个订阅者读取的 File 指针,来展示如何玩这个:这是一个 Cold Stream
  • 根据传参匹配要读取的行数,若传参为 Long.MAX_VALUE 则忽略行数限制。
  • 在每次调用 onNext() 之前检查一下数据流是否已取消
  • 调用 onComplete(),它将把订阅状态标记为已取消,并忽略以后所有可能出现的终端信号。
  • 调用 onError(e),它将把订阅状态标记为已取消,并忽略以后所有可能出现的终端信号。
  • 在订阅者不再关注的时候关闭文件(因为出现错误、读取完成或被取消时)。
  • 当失败时创建一个流,将订阅者传递给 onSubscribe() 并调用 onError(e)
  • capacity 将会提示下游操作(这里是 consumeOn) 把请求按照 4 字节的大小分块。
  • consumeOn 要在分配器中执行请求,另需要3个额外的参数,以便其它 3 种可能存在的 Consumer 对其信号作出反应。

使用核心的发布者类工厂(2.0.2 后支持)创建一个文件惰性读取发布者,并与 Stream API 组合

  1. final String filename = "settings.gradle";
  2. Publisher<String> fileStream = PublisherFactory.create(
  3. (n, sub) -> {
  4. String line;
  5. final BufferedReader inputStream = sub.context()
  6. long requestCursor = 0l;
  7. while ((requestCursor++ < n || n == Long.MAX_VALUE) && !sub.isCancelled()) {
  8. try {
  9. line = inputStream.readLine();
  10. if (line != null) {
  11. sub.onNext(line);
  12. } else {
  13. sub.onComplete();
  14. return;
  15. }
  16. }
  17. catch (IOException exc) {
  18. sub.onError(exc);
  19. }
  20. }
  21. },
  22. sub -> new BufferedReader(new FileReader(filename)),
  23. inputStream -> inputStream.close()
  24. );
  25. Streams
  26. .wrap(fileStream)
  27. .process(RingBufferProcessor.create())
  28. .capacity(4L)
  29. .consume(
  30. System.out::println,
  31. Throwable::printStackTrace,
  32. nothing -> System.out.println("## EOF ##")
  33. );
  • 实现一个 BIConsumer,以响应每个 Subscriber发出请求,请求的长度为 Long 型 n。任何未检查的意外都将触发终止回调函数并调用 Subscriber.onError(e)
  • 回调函数中传递的 Subscriber 是一个 SubscriberWithContext 装饰器,用它可以访问在开始时填充好的 context()
  • 根据传参匹配要读取的行数,若传参为 Long.MAX_VALUE 则忽略行数限制。同时在每次读取前使用 SubscriberWithContext.isCancelled() 检查 Subscriber是否异步取消了请求。
  • 调用 onComplete(),它将把 Subscriber 状态标记为取消,并忽略以后所有可能出现的终端信号。
  • 为之后新 Subscriber 的每次 SubscriberWithContext.context() 请求 创建一个上下文环境。
  • 当拦截到 cancel()onComplete()onError(e) 信号时,定义一个终止回调函数。 我们可以利用 PublisherFactory 工厂,或 Streams 工厂(例如 Streams.createWith())来完成常见的任务:

  • 打开一次 IO 操作。

  • 响应请求。
  • 更优雅的处理关闭操作。