制作一个简单的文件流
让我们从实现一个简单的 publisher
开始,我们将使用 Reactor API 来简化后面的示例代码。作为一个 Publisher
你不得不顾及很多细节,因为这些细节将在 Reactive Streams 中的 TCK 模块中被检测。这样做的目的是让你能够更好的理解 Reactor 在特定条件下能够完成的工作,从而避免重造轮子。
理论上说,在单线程、简单循环阻塞的文件读取消费中,Reactor Streams 并不能为你提供什么帮助。如果接收端点阻塞,它将发送多少读取多少,这已经是某种形式上的背压。Reactor 文件流 的优势在于,当流与消费者之间有一个或多个边界需要跨越时,可以通过采用队列或环形缓冲器来解耦合。你不妨想象一下:当你在向一个消费者发送数据时,同时进行数据的读取,那当它下次请求数据的时候(在前一次数据发送完成后),被请求的数据已经保存在内存中了。换句话说,就是预读取。
创建一个匹配订阅者请求的文件惰性读取发布者
Publisher<String> fileStream = new Publisher<String>() {
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
final File file = new File("settings.gradle");
try {
final BufferedReader is = new BufferedReader(new FileReader(file));
subscriber.onSubscribe(new Subscription() {
final AtomicBoolean terminated = new AtomicBoolean(false);
@Override
public void request(long n) {
long requestCursor = 0l;
try {
String line;
while ((requestCursor++ < n || n == Long.MAX_VALUE)
&& !terminated.get()) {
line = is.readLine();
if (line != null) {
subscriber.onNext(line);
} else {
if (terminate()) {
subscriber.onComplete();
}
return;
}
}
} catch (IOException e) {
if (terminate()) {
subscriber.onError(e);
}
}
}
@Override
public void cancel() {
terminate();
}
private boolean terminate() {
if (terminated.compareAndSet(false, true)) {
try {
is.close();
} catch (Exception t) {
subscriber.onError(t);
}
return true;
}
return false;
}
});
} catch (FileNotFoundException e) {
Streams.<String, FileNotFoundException> fail(e)
.subscribe(subscriber);
}
}
};
Streams.wrap(fileStream)
.capacity(4L)
.consumeOn(
Environment.sharedDispatcher(),
System.out::println,
Throwable::printStackTrace,
nothing -> System.out.println("## EOF ##")
);
- 实现一个
Publisher
。下一个列子中你将看到依靠核心和数据流,发布者能够多么灵巧。 - 创建供一个订阅者读取的
File
指针,来展示如何玩这个:这是一个Cold Stream
。 - 根据传参匹配要读取的行数,若传参为 Long.MAX_VALUE 则忽略行数限制。
- 在每次调用
onNext()
之前检查一下数据流是否已取消。 - 调用 onComplete(),它将把订阅状态标记为已取消,并忽略以后所有可能出现的终端信号。
- 调用 onError(e),它将把订阅状态标记为已取消,并忽略以后所有可能出现的终端信号。
- 在订阅者不再关注的时候关闭文件(因为出现错误、读取完成或被取消时)。
- 当失败时创建一个流,将订阅者传递给
onSubscribe()
并调用onError(e)
。 capacity
将会提示下游操作(这里是consumeOn
) 把请求按照 4 字节的大小分块。consumeOn
要在分配器中执行请求,另需要3个额外的参数,以便其它 3 种可能存在的Consumer
对其信号作出反应。
使用核心的发布者类工厂(2.0.2 后支持)创建一个文件惰性读取发布者,并与 Stream API 组合
final String filename = "settings.gradle";
Publisher<String> fileStream = PublisherFactory.create(
(n, sub) -> {
String line;
final BufferedReader inputStream = sub.context()
long requestCursor = 0l;
while ((requestCursor++ < n || n == Long.MAX_VALUE) && !sub.isCancelled()) {
try {
line = inputStream.readLine();
if (line != null) {
sub.onNext(line);
} else {
sub.onComplete();
return;
}
}
catch (IOException exc) {
sub.onError(exc);
}
}
},
sub -> new BufferedReader(new FileReader(filename)),
inputStream -> inputStream.close()
);
Streams
.wrap(fileStream)
.process(RingBufferProcessor.create())
.capacity(4L)
.consume(
System.out::println,
Throwable::printStackTrace,
nothing -> System.out.println("## EOF ##")
);
- 实现一个
BIConsumer
,以响应每个Subscriber
发出请求,请求的长度为Long 型
n。任何未检查的意外都将触发终止回调函数并调用Subscriber.onError(e)
。 - 回调函数中传递的
Subscriber
是一个SubscriberWithContext
装饰器,用它可以访问在开始时填充好的context()
- 根据传参匹配要读取的行数,若传参为 Long.MAX_VALUE 则忽略行数限制。同时在每次读取前使用
SubscriberWithContext.isCancelled()
检查Subscriber
是否异步取消了请求。 - 调用
onComplete()
,它将把Subscriber
状态标记为取消,并忽略以后所有可能出现的终端信号。 - 为之后新
Subscriber
的每次SubscriberWithContext.context()
请求 创建一个上下文环境。 当拦截到
cancel()
、onComplete()
或onError(e)
信号时,定义一个终止回调函数。 我们可以利用PublisherFactory 工厂
,或 Streams 工厂(例如Streams.createWith()
)来完成常见的任务:打开一次 IO 操作。
- 响应请求。
- 更优雅的处理关闭操作。