Pin

Async blocks and functions return types implementing the Future trait. The type returned is the result of a compiler transformation which turns local variables into data stored inside the future.

Some of those variables can hold pointers to other local variables. Because of that, the future should never be moved to a different memory location, as it would invalidate those pointers.

To prevent moving the future type in memory, it can only be polled through a pinned pointer. Pin is a wrapper around a reference that disallows all operations that would move the instance it points to into a different memory location.

  1. use tokio::sync::{mpsc, oneshot};
  2. use tokio::task::spawn;
  3. use tokio::time::{sleep, Duration};
  4. // A work item. In this case, just sleep for the given time and respond
  5. // with a message on the `respond_on` channel.
  6. #[derive(Debug)]
  7. struct Work {
  8.     input: u32,
  9.     respond_on: oneshot::Sender<u32>,
  10. }
  11. // A worker which listens for work on a queue and performs it.
  12. async fn worker(mut work_queue: mpsc::Receiver<Work>) {
  13.     let mut iterations = 0;
  14.     loop {
  15.         tokio::select! {
  16.             Some(work) = work_queue.recv() => {
  17.                 sleep(Duration::from_millis(10)).await; // Pretend to work.
  18.                 work.respond_on
  19.                     .send(work.input * 1000)
  20.                     .expect("failed to send response");
  21.                 iterations += 1;
  22.             }
  23.             // TODO: report number of iterations every 100ms
  24.         }
  25.     }
  26. }
  27. // A requester which requests work and waits for it to complete.
  28. async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
  29.     let (tx, rx) = oneshot::channel();
  30.     work_queue
  31.         .send(Work { input, respond_on: tx })
  32.         .await
  33.         .expect("failed to send on work queue");
  34.     rx.await.expect("failed waiting for response")
  35. }
  36. #[tokio::main]
  37. async fn main() {
  38.     let (tx, rx) = mpsc::channel(10);
  39.     spawn(worker(rx));
  40.     for i in 0..100 {
  41.         let resp = do_work(&tx, i).await;
  42.         println!("work result for iteration {i}: {resp}");
  43.     }
  44. }

This slide should take about 20 minutes.

  • You may recognize this as an example of the actor pattern. Actors typically call select! in a loop.

  • This serves as a summation of a few of the previous lessons, so take your time with it.

    • Naively add a _ = sleep(Duration::from_millis(100)) => { println!(..) } to the select!. This will never execute. Why?

    • Instead, add a timeout_fut containing that future outside of the loop:

      1. #![allow(unused)]
      2. fn main() {
      3. let timeout_fut = sleep(Duration::from_millis(100));
      4. loop {
      5.     select! {
      6.         ..,
      7.         _ = timeout_fut => { println!(..); },
      8.     }
      9. }
      10. }
    • This still doesn’t work. Follow the compiler errors, adding &mut to the timeout_fut in the select! to work around the move, then using Box::pin:

      1. #![allow(unused)]
      2. fn main() {
      3. let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      4. loop {
      5.     select! {
      6.         ..,
      7.         _ = &mut timeout_fut => { println!(..); },
      8.     }
      9. }
      10. }
    • This compiles, but once the timeout expires it is Poll::Ready on every iteration (a fused future would help with this). Update to reset timeout_fut every time it expires:

      1. #![allow(unused)]
      2. fn main() {
      3. let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      4. loop {
      5.     select! {
      6.         _ = &mut timeout_fut => {
      7.             println!(..);
      8.             timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      9.         },
      10.     }
      11. }
      12. }
  • Box allocates on the heap. In some cases, std::pin::pin! (only recently stabilized, with older code often using tokio::pin!) is also an option, but that is difficult to use for a future that is reassigned.

  • Another alternative is to not use pin at all but spawn another task that will send to a oneshot channel every 100ms.

  • Data that contains pointers to itself is called self-referential. Normally, the Rust borrow checker would prevent self-referential data from being moved, as the references cannot outlive the data they point to. However, the code transformation for async blocks and functions is not verified by the borrow checker.

  • Pin is a wrapper around a reference. An object cannot be moved from its place using a pinned pointer. However, it can still be moved through an unpinned pointer.

  • The poll method of the Future trait uses Pin<&mut Self> instead of &mut Self to refer to the instance. That’s why it can only be called on a pinned pointer.