Cancellation

Dropping a future implies it can never be polled again. This is called cancellation and it can occur at any await point. Care is needed to ensure the system works correctly even when futures are cancelled. For example, it shouldn’t deadlock or lose data.

  1. use std::io;
  2. use std::time::Duration;
  3. use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
  4. struct LinesReader {
  5.     stream: DuplexStream,
  6. }
  7. impl LinesReader {
  8.     fn new(stream: DuplexStream) -> Self {
  9.         Self { stream }
  10.     }
  11.     async fn next(&mut self) -> io::Result<Option<String>> {
  12.         let mut bytes = Vec::new();
  13.         let mut buf = [0];
  14.         while self.stream.read(&mut buf[..]).await? != 0 {
  15.             bytes.push(buf[0]);
  16.             if buf[0] == b'\n' {
  17.                 break;
  18.             }
  19.         }
  20.         if bytes.is_empty() {
  21.             return Ok(None);
  22.         }
  23.         let s = String::from_utf8(bytes)
  24.             .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
  25.         Ok(Some(s))
  26.     }
  27. }
  28. async fn slow_copy(source: String, mut dest: DuplexStream) -> io::Result<()> {
  29.     for b in source.bytes() {
  30.         dest.write_u8(b).await?;
  31.         tokio::time::sleep(Duration::from_millis(10)).await
  32.     }
  33.     Ok(())
  34. }
  35. #[tokio::main]
  36. async fn main() -> io::Result<()> {
  37.     let (client, server) = tokio::io::duplex(5);
  38.     let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
  39.     let mut lines = LinesReader::new(server);
  40.     let mut interval = tokio::time::interval(Duration::from_millis(60));
  41.     loop {
  42.         tokio::select! {
  43.             _ = interval.tick() => println!("tick!"),
  44.             line = lines.next() => if let Some(l) = line? {
  45.                 print!("{}", l)
  46.             } else {
  47.                 break
  48.             },
  49.         }
  50.     }
  51.     handle.await.unwrap()?;
  52.     Ok(())
  53. }

This slide should take about 18 minutes.

  • The compiler doesn’t help with cancellation-safety. You need to read API documentation and consider what state your async fn holds.

  • Unlike panic and ?, cancellation is part of normal control flow (vs error-handling).

  • The example loses parts of the string.

    • Whenever the tick() branch finishes first, next() and its buf are dropped.

    • LinesReader can be made cancellation-safe by making buf part of the struct:

      1. #![allow(unused)]
      2. fn main() {
      3. struct LinesReader {
      4.     stream: DuplexStream,
      5.     bytes: Vec<u8>,
      6.     buf: [u8; 1],
      7. }
      8. impl LinesReader {
      9.     fn new(stream: DuplexStream) -> Self {
      10.         Self { stream, bytes: Vec::new(), buf: [0] }
      11.     }
      12.     async fn next(&mut self) -> io::Result<Option<String>> {
      13.         // prefix buf and bytes with self.
      14.         // ...
      15.         let raw = std::mem::take(&mut self.bytes);
      16.         let s = String::from_utf8(raw)
      17.             .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
      18.         // ...
      19.     }
      20. }
      21. }
  • Interval::tick is cancellation-safe because it keeps track of whether a tick has been ‘delivered’.

  • AsyncReadExt::read is cancellation-safe because it either returns or doesn’t read data.

  • AsyncBufReadExt::read_line is similar to the example and isn’t cancellation-safe. See its documentation for details and alternatives.