Pin
Async blocks and functions return types implementing the Future
trait. The type returned is the result of a compiler transformation which turns local variables into data stored inside the future.
Some of those variables can hold pointers to other local variables. Because of that, the future should never be moved to a different memory location, as it would invalidate those pointers.
To prevent moving the future type in memory, it can only be polled through a pinned pointer. Pin
is a wrapper around a reference that disallows all operations that would move the instance it points to into a different memory location.
use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};
// A work item. In this case, just sleep for the given time and respond
// with a message on the `respond_on` channel.
#[derive(Debug)]
struct Work {
input: u32,
respond_on: oneshot::Sender<u32>,
}
// A worker which listens for work on a queue and performs it.
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
let mut iterations = 0;
loop {
tokio::select! {
Some(work) = work_queue.recv() => {
sleep(Duration::from_millis(10)).await; // Pretend to work.
work.respond_on
.send(work.input * 1000)
.expect("failed to send response");
iterations += 1;
}
// TODO: report number of iterations every 100ms
}
}
}
// A requester which requests work and waits for it to complete.
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
let (tx, rx) = oneshot::channel();
work_queue
.send(Work { input, respond_on: tx })
.await
.expect("failed to send on work queue");
rx.await.expect("failed waiting for response")
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(10);
spawn(worker(rx));
for i in 0..100 {
let resp = do_work(&tx, i).await;
println!("work result for iteration {i}: {resp}");
}
}
您可能认为这是执行器模式的一个示例。执行器通常会循环调用
select!
。本部分是对前面几节课的总结,因此请多花时间用心学习。
只是单纯地在
select!
中添加_ = sleep(Duration::from_millis(100)) => { println!(..) }
,该行代码将不会执行任何操作。这是为什么?请改为在
loop
外部添加包含该 Future 的timeout_fut
:#![allow(unused)]
fn main() {
let mut timeout_fut = sleep(Duration::from_millis(100));
loop {
select! {
..,
_ = timeout_fut => { println!(..); },
}
}
}
这仍然不起作用。根据编译器提示的错误,通过向
select!
中的timeout_fut
添加&mut
解决移动问题,然后使用Box::pin
:#![allow(unused)]
fn main() {
let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
loop {
select! {
..,
_ = &mut timeout_fut => { println!(..); },
}
}
}
可以编译这段代码了,但超时过期后,每次迭代都会变为
Poll::Ready
(使用混合 Future 有助于解决此问题)。每次超时过期后,通过更新重置timeout_fut
。
Box 在堆上进行分配。在某些情况下,也可以选择使用
std::pin::pin!
(最近才正式发布,较旧的代码通常使用tokio::pin!
),但对于重新分配的 Future,使用此功能较为困难。另一种替代方案是完全不使用
pin
,而是生成另一个任务,该任务每隔 100 毫秒就会发送到oneshot
通道。Data that contains pointers to itself is called self-referential. Normally, the Rust borrow checker would prevent self-referential data from being moved, as the references cannot outlive the data they point to. However, the code transformation for async blocks and functions is not verified by the borrow checker.
Pin
is a wrapper around a reference. An object cannot be moved from its place using a pinned pointer. However, it can still be moved through an unpinned pointer.The
poll
method of theFuture
trait usesPin<&mut Self>
instead of&mut Self
to refer to the instance. That’s why it can only be called on a pinned pointer.