在 Dart 里使用 Stream

Written by Lasse Nielsen April 2013 (updated October 2018)

dart:async 库中有两个类型,它们对许多 Dart API 来说都非常重要:StreamFuture。Future 用于表示单个运算的结果,而 Stream 则表示多个结果的序列。你可以监听 Stream 以获取其结果(包括数据和错误)或其关闭事件。也可以在 Stream 完成前对其暂停或停止监听。

但是本篇文章并非阐述 如何使用 Stream,而是向你介绍如何创建 Stream。你可以通过以下几种方式创建 Stream。

  • 转换现有的 Stream。

  • 使用 async* 函数创建 Stream。

  • 使用 StreamController 生成 Stream。

本文将向你展示每种方式的代码并且会给你一些有用的提示,这些提示可以帮助你正确创建 Stream。

可以查阅 异步编程:使用 Stream 获取更多关于 Stream 的信息。

转换现有的 Stream

我们在创建 Stream 时常见的情形是根据现有 Stream 的事件创建一个新的 Stream。比如你已经有了一个可以提供字节事件的 Stream,然后你想将该 Stream 变为一个可以提供字符串的 Stream,并且该 Stream 中的字符串还经过 UTF-8 编码。对于这种情况,常用的办法是创建一个新的 Stream 去等待获取原 Stream 的事件,然后再将新 Stream 中的事件输出。例如:

  1. /// 将连续的字符串 Stream 拆分为行。
  2. ///
  3. /// 输入的字符串来自于"源" Stream 并以较小的 chunk 块提供。
  4. Stream<String> lines(Stream<String> source) async* {
  5. // 存储从上一个数据块中分离出的字符串行。
  6. var partial = '';
  7. // 等到新的数据块可用时开始处理。
  8. await for (var chunk in source) {
  9. var lines = chunk.split('\n');
  10. lines[0] = partial + lines[0]; // 追加拼接行。
  11. partial = lines.removeLast(); // 删除剩余不完整的行。
  12. for (var line in lines) {
  13. yield line; // 将分离的每个字符串行添加至输出 Stream。
  14. }
  15. }
  16. // 最后如果最终的字符串行不为空则将其添加至输出流。
  17. if (partial.isNotEmpty) yield partial;
  18. }

你可以使用 Stream 类提供的转换类方法,比如 map()where()expand()take() 来应对大多数常见的转换需求。

例如,假设你有一个名为 counterStream 的 Stream,用于每秒打印输出一个自增的整数。其实现过程可能如下:

  1. var counterStream =
  2. Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);

你可以使用下面的代码快速查看事件:

  1. counterStream.forEach(print); // 每秒打印一个整数,共打印15次。

你可以在监听 Stream 前调用一个类似 map() 的转换方法来转换 Stream 的事件。该方法将返回一个新的 Stream。

  1. // 将每次事件中的整数乘以2。
  2. var doubleCounterStream = counterStream.map((int x) => x * 2);
  3. doubleCounterStream.forEach(print);

你可以使用任意其它的变换方法替代 map(),比如类似下面的这些:

  1. .where((int x) => x.isEven) // 只保留整型事件中整数为偶数的事件。
  2. .expand((var x) => [x, x]) // 复制每一个事件。
  3. .take(5) // 开始五个事件后停止。

通常而言,使用各种转换方法足以满足你简单的使用需求。但是,如果你需要对转换进行更多的控制,你可以使用 Stream 类的 transform() 方法指定一个 StreamTransformer。Dart 平台库为许多常见的任务需求提供了 Stream 转换器。例如下面的代码使用了由 dart:convert 库提供的utf8.decoderLineSplitter 转换器。

  1. Stream<List<int>> content = File('someFile.txt').openRead();
  2. List<String> lines =
  3. await content.transform(utf8.decoder).transform(LineSplitter()).toList();

从零开始创建 Stream

上一小节中我们使用一个现有的 Stream 经过转换生成新的 Stream。这一小节我们通过异步生成器 (async) 函数来完完全全地创建一个 Stream。当异步生成器函数被调用时会创建一个 Stream,而函数体则会在该 Stream 被监听时开始运行。当函数返回时,Stream 关闭。在函数返回前,你可以使用 yieldyield 语句向该 Stream 提交事件。

下面是一个周期性发送整数的函数例子:

  1. Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  2. int i = 0;
  3. while (true) {
  4. await Future.delayed(interval);
  5. yield i++;
  6. if (i == maxCount) break;
  7. }
  8. }

该函数返回一个 Stream。而函数体会在该 Stream 被监听时开始运行且以一定的周期间隔在指定的数字范围内不断地生成一个递增数字。如果省略掉 count 参数,那么循环将无休止地执行下去,此时除非取消订阅,否则 Stream 会不停地生成越来越多的数字。

当监听器取消时(调用由 listen() 方法返回的 StreamSubscription 对象中的 cancel() 方法),如果下一次循环体执行到 yield 语句,此时该语句的作用类似于 return 语句。而且任意 finally语句块在此时执行均会导致函数退出。如果函数尝试在退出前 yield 一个值,那么该尝试将会以失败告终并产生类似于 return 语句的效果。

当函数最终退出时,由 cancel() 方法返回的 Future 完成。如果函数是因为出错导致退出,则 Future 完成时会携带对应的错误,否则其会携带一个 null

另外,一个更有用的示例是将一个 Future 序列转换为 Stream 的函数:

  1. Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  2. for (var future in futures) {
  3. var result = await future;
  4. yield result;
  5. }
  6. }

该函数循环向 Future 序列请求一个 Future 并等待该 Future 完成获取其结果后提交给 Stream。如果某个 Future 因出错完成,则该错误也会提交给 Stream。

在实际应用中,通过 async 函数从零构建 Stream 的情况比较少见。async 函数通常会根据某些数据源来创建 Stream,而这些数据源常常又是另一个 Stream。比如像上述示例中的 Future 序列,其数据往往来自于其它的异步事件源。然而,在许多情况下,异步函数过于简单难以轻松地处理多个数据源的场景。而这就是 StreamController 类的用武之地。

使用 StreamController

如果你 Stream 的事件不仅来自于异步函数可以遍历的 Stream 和 Future,还来自于你程序的不同部分,这种情况使用上述两种方式生成 Stream 就显得比较困难。面对这种情况,我们可以使用一个StreamController来创建和填充 Stream。

StreamController 可以为你生成一个 Stream,并提供在任何时候、任何地方将事件添加到该 Stream 的方法。该 Stream 具有处理监听器和暂停所需的所有逻辑。控制器对象你可以自行处理而只需返回调用者所需的 Stream 即可。

下面的代码将为你展示一个简单的示例(出自 stream_controller_bad.dart),该示例使用 StreamController 来实现上一个示例中的 timedCounter() 函数。尽管该示例有一定的缺陷,但其为你展示了 StreamController 的基本用法。该代码将数据直接添加至 StreamController 而不是从 Future 或 Stream 中获取,并在最后返回 StreamController 中的 Stream。

  1. // 注意:该实现有缺陷。
  2. // 它在它拥有订阅者之前开始并且它没有实现暂停逻辑。
  3. Stream<int> timedCounter(Duration interval, [int maxCount]) {
  4. var controller = StreamController<int>();
  5. int counter = 0;
  6. void tick(Timer timer) {
  7. counter++;
  8. controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
  9. if (maxCount != null && counter >= maxCount) {
  10. timer.cancel();
  11. controller.close(); // 请求 Stream 关闭并告知监听器。
  12. }
  13. }
  14.  
  15. Timer.periodic(interval, tick); // 缺点:在其拥有订阅者之前开始了。
  16. return controller.stream;
  17. }

与前面一样,你可以像下面这样使用由 timedCounter() 函数返回的 Stream:

  1. var counterStream = timedCounter(const Duration(seconds: 1), 15);
  2. counterStream.listen(print); // 每秒打印输出一个整数,共打印 15 次。

timedCounter() 函数的实现有两个问题:

  • 它在拥有订阅者之前就开始生成事件。

  • 即使订阅者请求暂停,它也会继续生成事件。

如下一节所示,你可以在创建 StreamController 时通过指定回调,比如 onListenonPause 来修复这些问题。

等待订阅

一般来说,Stream 应该在它生成事件前等待订阅者,否则事件的生成毫无意义。对 async 函数而言,它可以自行处理该问题。但是当使用 StreamController 时,因为你可以有比使用 async 函数更多的控制能力,因此你完全可以无视相关规则自行添加并控制事件。如果一个 Stream 没有订阅者,它的 StreamController 会不断缓存事件,这可能会导致内存泄露。

将上面示例中使用 Stream 的代码更改为如下:

  1. void listenAfterDelay() async {
  2. var counterStream = timedCounter(const Duration(seconds: 1), 15);
  3. await Future.delayed(const Duration(seconds: 5));
  4.  
  5. // 5 秒后添加一个监听器。
  6. await for (int n in counterStream) {
  7. print(n); // 每秒打印输出一个整数,共打印 15 次。
  8. }
  9. }

当我们运行上述代码时,尽管 Stream 一开始就工作,但最开始的 5 秒内不会有任何东西打印输出。5 秒后我们向 Stream 添加监听器,此时前面的 5 个事件会被同时输出,因为它们被 StreamController 缓存了。

当你构建 StreamController 时,可以为其指定一个 onListen 参数回调用以接收订阅通知。当 Stream 获取到它的第一个订阅者时会触发调用 onListen 回调。同样地,你也可以指定一个 onCancel 回调,该回调则会在控制器丢失它最后一个订阅者时触发调用。在上述例子中,Timer.periodic() 的调用应该移至 onListen 中进行,如下一节所示。

遵循并实现暂停

当监听器请求暂停时应当避免继续生成事件。当 Stream 订阅暂停时,async* 函数可以自动地在一个 yield 语句执行时暂停。而 StreamController 则会在暂停时缓存事件。如果代码在处理事件生成时不考虑暂停功能,则缓存的大小可以无限制地增长。而且如果在暂停后监听器很快又请求停止,那么在暂停到停止这段时间内所做的缓存工作都是浪费的。

为了可以查看在不支持暂停的时候会发生什么,我们将上面使用 Stream 的代码更改为如下:

  1. void listenWithPause() {
  2. var counterStream = timedCounter(const Duration(seconds: 1), 15);
  3. StreamSubscription<int> subscription;
  4.  
  5. subscription = counterStream.listen((int counter) {
  6. print(counter); // 每秒打印输出一个整数。
  7. if (counter == 5) {
  8. // 打印输出 5 次后暂停 5 秒然后恢复。
  9. subscription.pause(Future.delayed(const Duration(seconds: 5)));
  10. }
  11. });
  12. }

当五秒钟的暂停时间结束时,在此期间生成的事件将同时被输出。出现这种状况的原因是因为生成 Stream 的源没有遵循暂停规则,因此其会持续不断地向向 Stream 中添加事件。进而导致 Stream 缓存事件,然后,当 Stream 从暂停中恢复时,它会清空并输出其缓存。

下面代码所实现的 timedCounter() 版本(出自 stream_controller.dart)通过使用 StreamController 中的onListenonPauseonResumeonCancel 回调实现暂停功能。

  1. Stream<int> timedCounter(Duration interval, [int maxCount]) {
  2. StreamController<int> controller;
  3. Timer timer;
  4. int counter = 0;
  5.  
  6. void tick(_) {
  7. counter++;
  8. controller.add(counter); // 请求stream将计数器值作为事件发送。
  9. if (counter == maxCount) {
  10. timer.cancel();
  11. controller.close(); // 请求 stream 关闭并告知监听器。
  12. }
  13. }
  14.  
  15. void startTimer() {
  16. timer = Timer.periodic(interval, tick);
  17. }
  18.  
  19. void stopTimer() {
  20. if (timer != null) {
  21. timer.cancel();
  22. timer = null;
  23. }
  24. }
  25.  
  26. controller = StreamController<int>(
  27. onListen: startTimer,
  28. onPause: stopTimer,
  29. onResume: startTimer,
  30. onCancel: stopTimer);
  31.  
  32. return controller.stream;
  33. }

listenWithPause() 函数中使用上面的这个 timedCounter 函数,运行后你就可以看到当订阅暂停时打印输出的计数也会暂停,尔后又可以正确地恢复。

你必须使用全部的回调 onListenonCancelonPauseonResume来通知暂停状态的变化,否则如果订阅状态与暂停状态在同一时间都改变了,只会有 onListenonCancel 回调会被调用。

最后的提示

当你不通过 async* 函数创建 Stream 时,请务必牢记以下几点:

  • 使用同步控制器时要小心。例如,使用 StreamController(sync: true) 构造方法创建控制器。当你发送一个事件到一个未暂停的同步控制器(例如:使用 EventSink 中定义的 add()addError()close() 方法),事件立即发送给所有 Stream 的监听器。在添加监听器的代码返回之前,决不能调用 Stream 监听器,而在错误的事件使用同步控制器会破坏该规则并导致其它正常代码执行失败。因此,你应该避免使用同步控制器。

  • 如果你使用 StreamControlleronListen 回调会在 listen 方法调用返回 StreamSubscription 前返回。不要让 onListen 回调依赖于已经存在的订阅。例如,在下面的代码中,onListen 回调有可能会在 subscription变量被初始化为一个有效值之前被触发(同时 处理器 被调用)。

  1. subscription = stream.listen(handler);
  • 当 Stream 的监听器状态改变时,由 StreamController 定义的onListenonPauseonResumeonCancel 回调会被调用,该调用绝不会发生在事件生成时或在某个状态变化处理回调的调用期间。在这些情况出现时,状态变化的回调会被延迟,直到上一个回调执行完成。

  • 不要尝试自己去实现 Stream 接口。否则很容易在事件、回调以及添加和移除监听器这些操作交互时出现一些难以察觉的错误。你应该总是使用一个现有的 Stream(比如由 StreamController 生成的)去实现新 Stream 中 listen 方法的调用。

  • 尽管你可以通过扩展 Stream 类并实现 listen 方法来实现更多额外的功能,但一般不建议这么做,因为这样会引入一个调用者必须考虑的新类型。相反,你可以创建一个(或多个)具有 Stream 的类而不是一个(或多个)Stream。