A few loose ends
Recall when we were implementing the Delay
future, we said there were a few more things to fix. Rust’s asynchronous model allows a single future to migrate across tasks while it executes. Consider the following:
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
The poll_fn
function creates a Future
instance using a closure. The snippet above creates a Delay
instance, polls it once, then sends the Delay
instance to a new task where it is awaited. In this example, Delay::poll
is called more than once with different Waker
instances. Our earlier implementation did not handle this case and the spawned task would sleep forever, as the wrong task is notified.
When implementing a future, it is critical to assume that each call to poll
could supply a different Waker
instance. The poll function must update any previously recorded waker with the new one.
To fix our earlier implementation, we could do something like this:
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}
It is a bit involved, but the idea is, on each call to poll
, the future checks if the supplied waker matches the previously recorded waker. If the two wakers match, then there is nothing else to do. If they do not match, then the recorded waker must be updated.
Notify
utility
We demonstrated how a Delay
future could be implemented by hand using wakers. Wakers are the foundation of how asynchronous Rust works. Usually, it is not necessary to drop down to that level. For example, in the case of Delay
, we could implement it entirely with async/await
by using the tokio::sync::Notify
utility. This utility provides a basic task notification mechanism. It handles the details of wakers, including making sure that the recorded waker matches the current task.
Using Notify
, we can implement a delay
function using async/await
like this:
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}