Pin

Async blocks and functions return types implementing the Future trait. The type returned is the result of a compiler transformation which turns local variables into data stored inside the future.

Some of those variables can hold pointers to other local variables. Because of that, the future should never be moved to a different memory location, as it would invalidate those pointers.

To prevent moving the future type in memory, it can only be polled through a pinned pointer. Pin is a wrapper around a reference that disallows all operations that would move the instance it points to into a different memory location.

  1. use tokio::sync::{mpsc, oneshot};
  2. use tokio::task::spawn;
  3. use tokio::time::{sleep, Duration};
  4. // A work item. In this case, just sleep for the given time and respond
  5. // with a message on the `respond_on` channel.
  6. #[derive(Debug)]
  7. struct Work {
  8. input: u32,
  9. respond_on: oneshot::Sender<u32>,
  10. }
  11. // A worker which listens for work on a queue and performs it.
  12. async fn worker(mut work_queue: mpsc::Receiver<Work>) {
  13. let mut iterations = 0;
  14. loop {
  15. tokio::select! {
  16. Some(work) = work_queue.recv() => {
  17. sleep(Duration::from_millis(10)).await; // Pretend to work.
  18. work.respond_on
  19. .send(work.input * 1000)
  20. .expect("failed to send response");
  21. iterations += 1;
  22. }
  23. // TODO: report number of iterations every 100ms
  24. }
  25. }
  26. }
  27. // A requester which requests work and waits for it to complete.
  28. async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
  29. let (tx, rx) = oneshot::channel();
  30. work_queue
  31. .send(Work { input, respond_on: tx })
  32. .await
  33. .expect("failed to send on work queue");
  34. rx.await.expect("failed waiting for response")
  35. }
  36. #[tokio::main]
  37. async fn main() {
  38. let (tx, rx) = mpsc::channel(10);
  39. spawn(worker(rx));
  40. for i in 0..100 {
  41. let resp = do_work(&tx, i).await;
  42. println!("work result for iteration {i}: {resp}");
  43. }
  44. }
  • 您可能认为这是执行器模式的一个示例。执行器通常会循环调用 select!

  • 本部分是对前面几节课的总结,因此请多花时间用心学习。

    • 只是单纯地在 select! 中添加 _ = sleep(Duration::from_millis(100)) => { println!(..) },该行代码将不会执行任何操作。这是为什么?

    • 请改为在 loop 外部添加包含该 Future 的 timeout_fut

      1. #![allow(unused)]
      2. fn main() {
      3. let mut timeout_fut = sleep(Duration::from_millis(100));
      4. loop {
      5. select! {
      6. ..,
      7. _ = timeout_fut => { println!(..); },
      8. }
      9. }
      10. }
    • 这仍然不起作用。根据编译器提示的错误,通过向 select! 中的 timeout_fut 添加 &mut 解决移动问题,然后使用 Box::pin

      1. #![allow(unused)]
      2. fn main() {
      3. let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      4. loop {
      5. select! {
      6. ..,
      7. _ = &mut timeout_fut => { println!(..); },
      8. }
      9. }
      10. }
    • 可以编译这段代码了,但超时过期后,每次迭代都会变为 Poll::Ready(使用混合 Future 有助于解决此问题)。每次超时过期后,通过更新重置 timeout_fut

  • Box 在堆上进行分配。在某些情况下,也可以选择使用 std::pin::pin!(最近才正式发布,较旧的代码通常使用 tokio::pin!),但对于重新分配的 Future,使用此功能较为困难。

  • 另一种替代方案是完全不使用 pin,而是生成另一个任务,该任务每隔 100 毫秒就会发送到 oneshot 通道。

  • Data that contains pointers to itself is called self-referential. Normally, the Rust borrow checker would prevent self-referential data from being moved, as the references cannot outlive the data they point to. However, the code transformation for async blocks and functions is not verified by the borrow checker.

  • Pin is a wrapper around a reference. An object cannot be moved from its place using a pinned pointer. However, it can still be moved through an unpinned pointer.

  • The poll method of the Future trait uses Pin<&mut Self> instead of &mut Self to refer to the instance. That’s why it can only be called on a pinned pointer.