在 Dart 里使用 Stream
Written by Lasse Nielsen April 2013 (updated October 2018)
dart:async 库中有两个类型,它们对许多 Dart API 来说都非常重要:Stream 和 Future。Future 用于表示单个运算的结果,而 Stream 则表示多个结果的序列。你可以监听 Stream 以获取其结果(包括数据和错误)或其关闭事件。也可以在 Stream 完成前对其暂停或停止监听。
但是本篇文章并非阐述 如何使用 Stream,而是向你介绍如何创建 Stream。你可以通过以下几种方式创建 Stream。
转换现有的 Stream。
使用
async*
函数创建 Stream。使用
StreamController
生成 Stream。
本文将向你展示每种方式的代码并且会给你一些有用的提示,这些提示可以帮助你正确创建 Stream。
可以查阅 异步编程:使用 Stream 获取更多关于 Stream 的信息。
转换现有的 Stream
我们在创建 Stream 时常见的情形是根据现有 Stream 的事件创建一个新的 Stream。比如你已经有了一个可以提供字节事件的 Stream,然后你想将该 Stream 变为一个可以提供字符串的 Stream,并且该 Stream 中的字符串还经过 UTF-8 编码。对于这种情况,常用的办法是创建一个新的 Stream 去等待获取原 Stream 的事件,然后再将新 Stream 中的事件输出。例如:
- /// 将连续的字符串 Stream 拆分为行。
- ///
- /// 输入的字符串来自于"源" Stream 并以较小的 chunk 块提供。
- Stream<String> lines(Stream<String> source) async* {
- // 存储从上一个数据块中分离出的字符串行。
- var partial = '';
- // 等到新的数据块可用时开始处理。
- await for (var chunk in source) {
- var lines = chunk.split('\n');
- lines[0] = partial + lines[0]; // 追加拼接行。
- partial = lines.removeLast(); // 删除剩余不完整的行。
- for (var line in lines) {
- yield line; // 将分离的每个字符串行添加至输出 Stream。
- }
- }
- // 最后如果最终的字符串行不为空则将其添加至输出流。
- if (partial.isNotEmpty) yield partial;
- }
你可以使用 Stream
类提供的转换类方法,比如 map()
、where()
、expand()
和 take()
来应对大多数常见的转换需求。
例如,假设你有一个名为 counterStream
的 Stream,用于每秒打印输出一个自增的整数。其实现过程可能如下:
- var counterStream =
- Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
你可以使用下面的代码快速查看事件:
- counterStream.forEach(print); // 每秒打印一个整数,共打印15次。
你可以在监听 Stream 前调用一个类似 map()
的转换方法来转换 Stream 的事件。该方法将返回一个新的 Stream。
- // 将每次事件中的整数乘以2。
- var doubleCounterStream = counterStream.map((int x) => x * 2);
- doubleCounterStream.forEach(print);
你可以使用任意其它的变换方法替代 map()
,比如类似下面的这些:
- .where((int x) => x.isEven) // 只保留整型事件中整数为偶数的事件。
- .expand((var x) => [x, x]) // 复制每一个事件。
- .take(5) // 开始五个事件后停止。
通常而言,使用各种转换方法足以满足你简单的使用需求。但是,如果你需要对转换进行更多的控制,你可以使用 Stream
类的 transform()
方法指定一个 StreamTransformer。Dart 平台库为许多常见的任务需求提供了 Stream 转换器。例如下面的代码使用了由 dart:convert 库提供的utf8.decoder
和 LineSplitter
转换器。
- Stream<List<int>> content = File('someFile.txt').openRead();
- List<String> lines =
- await content.transform(utf8.decoder).transform(LineSplitter()).toList();
从零开始创建 Stream
上一小节中我们使用一个现有的 Stream 经过转换生成新的 Stream。这一小节我们通过异步生成器 (async
) 函数来完完全全地创建一个 Stream。当异步生成器函数被调用时会创建一个 Stream,而函数体则会在该 Stream 被监听时开始运行。当函数返回时,Stream 关闭。在函数返回前,你可以使用 yield
或 yield
语句向该 Stream 提交事件。
下面是一个周期性发送整数的函数例子:
- Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
- int i = 0;
- while (true) {
- await Future.delayed(interval);
- yield i++;
- if (i == maxCount) break;
- }
- }
该函数返回一个 Stream
。而函数体会在该 Stream 被监听时开始运行且以一定的周期间隔在指定的数字范围内不断地生成一个递增数字。如果省略掉 count
参数,那么循环将无休止地执行下去,此时除非取消订阅,否则 Stream 会不停地生成越来越多的数字。
当监听器取消时(调用由 listen()
方法返回的 StreamSubscription
对象中的 cancel()
方法),如果下一次循环体执行到 yield
语句,此时该语句的作用类似于 return
语句。而且任意 finally
语句块在此时执行均会导致函数退出。如果函数尝试在退出前 yield 一个值,那么该尝试将会以失败告终并产生类似于 return 语句的效果。
当函数最终退出时,由 cancel()
方法返回的 Future 完成。如果函数是因为出错导致退出,则 Future 完成时会携带对应的错误,否则其会携带一个 null
。
另外,一个更有用的示例是将一个 Future 序列转换为 Stream 的函数:
- Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
- for (var future in futures) {
- var result = await future;
- yield result;
- }
- }
该函数循环向 Future
序列请求一个 Future 并等待该 Future 完成获取其结果后提交给 Stream。如果某个 Future 因出错完成,则该错误也会提交给 Stream。
在实际应用中,通过 async
函数从零构建 Stream 的情况比较少见。async
函数通常会根据某些数据源来创建 Stream,而这些数据源常常又是另一个 Stream。比如像上述示例中的 Future 序列,其数据往往来自于其它的异步事件源。然而,在许多情况下,异步函数过于简单难以轻松地处理多个数据源的场景。而这就是 StreamController
类的用武之地。
使用 StreamController
如果你 Stream 的事件不仅来自于异步函数可以遍历的 Stream 和 Future,还来自于你程序的不同部分,这种情况使用上述两种方式生成 Stream 就显得比较困难。面对这种情况,我们可以使用一个StreamController来创建和填充 Stream。
StreamController
可以为你生成一个 Stream,并提供在任何时候、任何地方将事件添加到该 Stream 的方法。该 Stream 具有处理监听器和暂停所需的所有逻辑。控制器对象你可以自行处理而只需返回调用者所需的 Stream 即可。
下面的代码将为你展示一个简单的示例(出自 stream_controller_bad.dart),该示例使用 StreamController
来实现上一个示例中的 timedCounter()
函数。尽管该示例有一定的缺陷,但其为你展示了 StreamController
的基本用法。该代码将数据直接添加至 StreamController
而不是从 Future 或 Stream 中获取,并在最后返回 StreamController
中的 Stream。
- // 注意:该实现有缺陷。
- // 它在它拥有订阅者之前开始并且它没有实现暂停逻辑。
- Stream<int> timedCounter(Duration interval, [int maxCount]) {
- var controller = StreamController<int>();
- int counter = 0;
- void tick(Timer timer) {
- counter++;
- controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
- if (maxCount != null && counter >= maxCount) {
- timer.cancel();
- controller.close(); // 请求 Stream 关闭并告知监听器。
- }
- }
- Timer.periodic(interval, tick); // 缺点:在其拥有订阅者之前开始了。
- return controller.stream;
- }
与前面一样,你可以像下面这样使用由 timedCounter()
函数返回的 Stream:
- var counterStream = timedCounter(const Duration(seconds: 1), 15);
- counterStream.listen(print); // 每秒打印输出一个整数,共打印 15 次。
timedCounter()
函数的实现有两个问题:
它在拥有订阅者之前就开始生成事件。
即使订阅者请求暂停,它也会继续生成事件。
如下一节所示,你可以在创建 StreamController
时通过指定回调,比如 onListen
和 onPause
来修复这些问题。
等待订阅
一般来说,Stream 应该在它生成事件前等待订阅者,否则事件的生成毫无意义。对 async
函数而言,它可以自行处理该问题。但是当使用 StreamController
时,因为你可以有比使用 async
函数更多的控制能力,因此你完全可以无视相关规则自行添加并控制事件。如果一个 Stream 没有订阅者,它的 StreamController
会不断缓存事件,这可能会导致内存泄露。
将上面示例中使用 Stream 的代码更改为如下:
- void listenAfterDelay() async {
- var counterStream = timedCounter(const Duration(seconds: 1), 15);
- await Future.delayed(const Duration(seconds: 5));
- // 5 秒后添加一个监听器。
- await for (int n in counterStream) {
- print(n); // 每秒打印输出一个整数,共打印 15 次。
- }
- }
当我们运行上述代码时,尽管 Stream 一开始就工作,但最开始的 5 秒内不会有任何东西打印输出。5 秒后我们向 Stream 添加监听器,此时前面的 5 个事件会被同时输出,因为它们被 StreamController
缓存了。
当你构建 StreamController
时,可以为其指定一个 onListen
参数回调用以接收订阅通知。当 Stream 获取到它的第一个订阅者时会触发调用 onListen
回调。同样地,你也可以指定一个 onCancel
回调,该回调则会在控制器丢失它最后一个订阅者时触发调用。在上述例子中,Timer.periodic()
的调用应该移至 onListen
中进行,如下一节所示。
遵循并实现暂停
当监听器请求暂停时应当避免继续生成事件。当 Stream 订阅暂停时,async* 函数可以自动地在一个 yield
语句执行时暂停。而 StreamController
则会在暂停时缓存事件。如果代码在处理事件生成时不考虑暂停功能,则缓存的大小可以无限制地增长。而且如果在暂停后监听器很快又请求停止,那么在暂停到停止这段时间内所做的缓存工作都是浪费的。
为了可以查看在不支持暂停的时候会发生什么,我们将上面使用 Stream 的代码更改为如下:
- void listenWithPause() {
- var counterStream = timedCounter(const Duration(seconds: 1), 15);
- StreamSubscription<int> subscription;
- subscription = counterStream.listen((int counter) {
- print(counter); // 每秒打印输出一个整数。
- if (counter == 5) {
- // 打印输出 5 次后暂停 5 秒然后恢复。
- subscription.pause(Future.delayed(const Duration(seconds: 5)));
- }
- });
- }
当五秒钟的暂停时间结束时,在此期间生成的事件将同时被输出。出现这种状况的原因是因为生成 Stream 的源没有遵循暂停规则,因此其会持续不断地向向 Stream 中添加事件。进而导致 Stream 缓存事件,然后,当 Stream 从暂停中恢复时,它会清空并输出其缓存。
下面代码所实现的 timedCounter()
版本(出自 stream_controller.dart)通过使用 StreamController
中的onListen
、onPause
、onResume
和 onCancel
回调实现暂停功能。
- Stream<int> timedCounter(Duration interval, [int maxCount]) {
- StreamController<int> controller;
- Timer timer;
- int counter = 0;
- void tick(_) {
- counter++;
- controller.add(counter); // 请求stream将计数器值作为事件发送。
- if (counter == maxCount) {
- timer.cancel();
- controller.close(); // 请求 stream 关闭并告知监听器。
- }
- }
- void startTimer() {
- timer = Timer.periodic(interval, tick);
- }
- void stopTimer() {
- if (timer != null) {
- timer.cancel();
- timer = null;
- }
- }
- controller = StreamController<int>(
- onListen: startTimer,
- onPause: stopTimer,
- onResume: startTimer,
- onCancel: stopTimer);
- return controller.stream;
- }
在 listenWithPause()
函数中使用上面的这个 timedCounter
函数,运行后你就可以看到当订阅暂停时打印输出的计数也会暂停,尔后又可以正确地恢复。
你必须使用全部的回调 onListen
、onCancel
、onPause
和 onResume
来通知暂停状态的变化,否则如果订阅状态与暂停状态在同一时间都改变了,只会有 onListen
或 onCancel
回调会被调用。
最后的提示
当你不通过 async* 函数创建 Stream 时,请务必牢记以下几点:
使用同步控制器时要小心。例如,使用
StreamController(sync: true)
构造方法创建控制器。当你发送一个事件到一个未暂停的同步控制器(例如:使用 EventSink 中定义的add()
、addError()
或close()
方法),事件立即发送给所有 Stream 的监听器。在添加监听器的代码返回之前,决不能调用Stream
监听器,而在错误的事件使用同步控制器会破坏该规则并导致其它正常代码执行失败。因此,你应该避免使用同步控制器。如果你使用
StreamController
,onListen
回调会在listen
方法调用返回StreamSubscription
前返回。不要让onListen
回调依赖于已经存在的订阅。例如,在下面的代码中,onListen
回调有可能会在subscription
变量被初始化为一个有效值之前被触发(同时处理器
被调用)。
- subscription = stream.listen(handler);
当 Stream 的监听器状态改变时,由
StreamController
定义的onListen
、onPause
、onResume
和onCancel
回调会被调用,该调用绝不会发生在事件生成时或在某个状态变化处理回调的调用期间。在这些情况出现时,状态变化的回调会被延迟,直到上一个回调执行完成。不要尝试自己去实现
Stream
接口。否则很容易在事件、回调以及添加和移除监听器这些操作交互时出现一些难以察觉的错误。你应该总是使用一个现有的 Stream(比如由StreamController
生成的)去实现新 Stream 中listen
方法的调用。尽管你可以通过扩展
Stream
类并实现listen
方法来实现更多额外的功能,但一般不建议这么做,因为这样会引入一个调用者必须考虑的新类型。相反,你可以创建一个(或多个)具有Stream
的类而不是一个(或多个)Stream。