消除
丢弃 Future 意味着无法再对其进行轮询。这称为 取消,在任何 await
点都可能发生。请务必小心谨慎,确保即使 Future 任务被取消,系统也能正常运行。例如,系统不应死锁或丢失数据。
use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
struct LinesReader {
stream: DuplexStream,
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream }
}
async fn next(&mut self) -> io::Result<Option<String>> {
let mut bytes = Vec::new();
let mut buf = [0];
while self.stream.read(&mut buf[..]).await? != 0 {
bytes.push(buf[0]);
if buf[0] == b'\n' {
break;
}
}
if bytes.is_empty() {
return Ok(None);
}
let s = String::from_utf8(bytes)
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
Ok(Some(s))
}
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
for b in source.bytes() {
dest.write_u8(b).await?;
tokio::time::sleep(Duration::from_millis(10)).await
}
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (client, server) = tokio::io::duplex(5);
let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));
let mut lines = LinesReader::new(server);
let mut interval = tokio::time::interval(Duration::from_millis(60));
loop {
tokio::select! {
_ = interval.tick() => println!("tick!"),
line = lines.next() => if let Some(l) = line? {
print!("{}", l)
} else {
break
},
}
}
handle.await.unwrap()?;
Ok(())
}
编译器无法确保取消操作的安全性。您需要阅读 API 文档,并考虑
async fn
所持状态。与
panic
和?
不同,取消属于正常控制流的一部分(而非错误处理)。该示例丢失了字符串的某些部分。
每当
tick()
分支先完成操作时,next() 及其
buf` 均会被丢弃。通过将
buf
整合到结构体中,`LinesReader 可以确保取消操作的安全性:#![allow(unused)]
fn main() {
struct LinesReader {
stream: DuplexStream,
bytes: Vec<u8>,
buf: [u8; 1],
}
impl LinesReader {
fn new(stream: DuplexStream) -> Self {
Self { stream, bytes: Vec::new(), buf: [0] }
}
async fn next(&mut self) -> io::Result<Option<String>> {
// prefix buf and bytes with self.
// ...
let raw = std::mem::take(&mut self.bytes);
let s = String::from_utf8(raw)
// ...
}
}
}
Interval::tick is cancellation-safe because it keeps track of whether a tick has been ‘delivered’.
AsyncReadExt::read is cancellation-safe because it either returns or doesn’t read data.
AsyncBufReadExt::read_line is similar to the example and isn’t cancellation-safe. See its documentation for details and alternatives.