Cancellation
Dropping a future implies it can never be polled again. This is called cancellation and it can occur at any await
point. Care is needed to ensure the system works correctly even when futures are cancelled. For example, it shouldn’t deadlock or lose data.
use std::io;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
struct LinesReader {
stream: DuplexStream,
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream }
}
async fn next(&mut self) -> io::Result<Option<String>> {
let mut bytes = Vec::new();
let mut buf = [0];
while self.stream.read(&mut buf[..]).await? != 0 {
bytes.push(buf[0]);
if buf[0] == b'\n' {
break;
}
}
if bytes.is_empty() {
return Ok(None);
}
let s = String::from_utf8(bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
Ok(Some(s))
}
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> io::Result<()> {
for b in source.bytes() {
dest.write_u8(b).await?;
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
#[tokio::main]
async fn main() -> io::Result<()> {
let (client, server) = tokio::io::duplex(5);
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
let mut lines = LinesReader::new(server);
let mut interval = tokio::time::interval(Duration::from_millis(60));
loop {
tokio::select! {
_ = interval.tick() => println!("tick!"),
line = lines.next() => if let Some(l) = line? {
print!("{}", l)
} else {
break
},
}
}
handle.await.unwrap()?;
Ok(())
}
This slide should take about 18 minutes.
The compiler doesn’t help with cancellation-safety. You need to read API documentation and consider what state your
async fn
holds.Unlike
panic
and?
, cancellation is part of normal control flow (vs error-handling).The example loses parts of the string.
Whenever the
tick()
branch finishes first,next()
and itsbuf
are dropped.LinesReader
can be made cancellation-safe by makingbuf
part of the struct:#![allow(unused)]
fn main() {
struct LinesReader {
stream: DuplexStream,
bytes: Vec<u8>,
buf: [u8; 1],
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream, bytes: Vec::new(), buf: [0] }
}
async fn next(&mut self) -> io::Result<Option<String>> {
// prefix buf and bytes with self.
// ...
let raw = std::mem::take(&mut self.bytes);
let s = String::from_utf8(raw)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
// ...
}
}
}
Interval::tick is cancellation-safe because it keeps track of whether a tick has been ‘delivered’.
AsyncReadExt::read is cancellation-safe because it either returns or doesn’t read data.
AsyncBufReadExt::read_line is similar to the example and isn’t cancellation-safe. See its documentation for details and alternatives.