消除

丢弃 Future 意味着无法再对其进行轮询。这称为 取消,在任何 await 点都可能发生。请务必小心谨慎,确保即使 Future 任务被取消,系统也能正常运行。例如,系统不应死锁或丢失数据。

  1. use std::io::{self, ErrorKind};
  2. use std::time::Duration;
  3. use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
  4. struct LinesReader {
  5. stream: DuplexStream,
  6. }
  7. impl LinesReader {
  8. fn new(stream: DuplexStream) -> Self {
  9. Self { stream }
  10. }
  11. async fn next(&mut self) -> io::Result<Option<String>> {
  12. let mut bytes = Vec::new();
  13. let mut buf = [0];
  14. while self.stream.read(&mut buf[..]).await? != 0 {
  15. bytes.push(buf[0]);
  16. if buf[0] == b'\n' {
  17. break;
  18. }
  19. }
  20. if bytes.is_empty() {
  21. return Ok(None);
  22. }
  23. let s = String::from_utf8(bytes)
  24. .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
  25. Ok(Some(s))
  26. }
  27. }
  28. async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
  29. for b in source.bytes() {
  30. dest.write_u8(b).await?;
  31. tokio::time::sleep(Duration::from_millis(10)).await
  32. }
  33. Ok(())
  34. }
  35. #[tokio::main]
  36. async fn main() -> std::io::Result<()> {
  37. let (client, server) = tokio::io::duplex(5);
  38. let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
  39. let mut lines = LinesReader::new(server);
  40. let mut interval = tokio::time::interval(Duration::from_millis(60));
  41. loop {
  42. tokio::select! {
  43. _ = interval.tick() => println!("tick!"),
  44. line = lines.next() => if let Some(l) = line? {
  45. print!("{}", l)
  46. } else {
  47. break
  48. },
  49. }
  50. }
  51. handle.await.unwrap()?;
  52. Ok(())
  53. }
  • 编译器无法确保取消操作的安全性。您需要阅读 API 文档,并考虑 async fn 所持状态。

  • panic? 不同,取消属于正常控制流的一部分(而非错误处理)。

  • 该示例丢失了字符串的某些部分。

    • 每当 tick() 分支先完成操作时,next() 及其 buf` 均会被丢弃。

    • 通过将 buf 整合到结构体中,`LinesReader 可以确保取消操作的安全性:

      1. #![allow(unused)]
      2. fn main() {
      3. struct LinesReader {
      4. stream: DuplexStream,
      5. bytes: Vec<u8>,
      6. buf: [u8; 1],
      7. }
      8. impl LinesReader {
      9. fn new(stream: DuplexStream) -> Self {
      10. Self { stream, bytes: Vec::new(), buf: [0] }
      11. }
      12. async fn next(&mut self) -> io::Result<Option<String>> {
      13. // prefix buf and bytes with self.
      14. // ...
      15. let raw = std::mem::take(&mut self.bytes);
      16. let s = String::from_utf8(raw)
      17. // ...
      18. }
      19. }
      20. }
  • Interval::tick is cancellation-safe because it keeps track of whether a tick has been ‘delivered’.

  • AsyncReadExt::read is cancellation-safe because it either returns or doesn’t read data.

  • AsyncBufReadExt::read_line is similar to the example and isn’t cancellation-safe. See its documentation for details and alternatives.