Async in depth
At this point, we have completed a fairly comprehensive tour of asynchronous Rust and Tokio. Now we will dig deeper into Rust’s asynchronous runtime model. At the very beginning of the tutorial, we hinted that asynchronous Rust takes a unique approach. Now, we explain what that means.
Futures
As a quick review, let’s take a very basic asynchronous function. This is nothing new compared to what the tutorial has covered so far.
use tokio::net::TcpStream;
async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}
We call the function and it returns some value. We call .await
on that value.
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}
The value returned by my_async_fn()
is a future. A future is a value that implements the std::future::Future
trait provided by the standard library. They are values that contain the in-progress asynchronous computation.
The std::future::Future
trait definition is:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}
The associated type Output
is the type that the future produces once it completes. The Pin
type is how Rust is able to support borrows in async
functions. See the standard library documentation for more details.
Unlike how futures are implemented in other languages, a Rust future does not represent a computation happening in the background, rather the Rust future is the computation itself. The owner of the future is responsible for advancing the computation by polling the future. This is done by calling Future::poll
.
Implementing Future
Let’s implement a very simple future. This future will:
- Wait until a specific instant in time.
- Output some text to STDOUT.
- Yield a string.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
Async fn as a Future
In the main function, we instantiate the future and call .await
on it. From async functions, we may call .await
on any value that implements Future
. In turn, calling an async
function returns an anonymous type that implements Future
. In the case of async fn main()
, the generated future is roughly:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
Rust futures are state machines. Here, MainFuture
is represented as an enum
of the future’s possible states. The future starts in the State0
state. When poll
is invoked, the future attempts to advance its internal state as much as possible. If the future is able to complete, Poll::Ready
is returned containing the output of the asynchronous computation.
If the future is not able to complete, usually due to resources it is waiting on not being ready, then Poll::Pending
is returned. Receiving Poll::Pending
indicates to the caller that the future will complete at a later time and the caller should invoke poll
again later.
We also see that futures are composed of other futures. Calling poll
on the outer future results in calling the inner future’s poll
function.
Executors
Asynchronous Rust functions return futures. Futures must have poll
called on them to advance their state. Futures are composed of other futures. So, the question is, what calls poll
on the very most outer future?
Recall from earlier, to run asynchronous functions, they must either be passed to tokio::spawn
or be the main function annotated with #[tokio::main]
. This results in submitting the generated outer future to the Tokio executor. The executor is responsible for calling Future::poll
on the outer future, driving the asynchronous computation to completion.
Mini Tokio
To better understand how this all fits together, let’s implement our own minimal version of Tokio! The full code can be found here.
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
This runs the async block. A Delay
instance is created with the requested delay and is awaited on. However, our implementation so far has a major flaw. Our executor never goes to sleep. The executor continuously loops all spawned futures and polls them. Most of the time, the futures will not be ready to perform more work and will return Poll::Pending
again. The process will burn CPU and generally not be very efficient.
Ideally, we want mini-tokio to only poll futures when the future is able to make progress. This happens when a resource that the task is blocked on becomes ready to perform the requested operation. If the task wants to read data from a TCP socket, then we only want to poll the task when the TCP socket has received data. In our case, the task is blocked on the given Instant
being reached. Ideally, mini-tokio would only poll the task once that instant in time has passed.
To achieve this, when a resource is polled, and the resource is not ready, the resource will send a notification once it transitions into a ready state.
Wakers
Wakers are the missing piece. This is the system by which a resource is able to notify the waiting task that the resource has become ready to continue some operation.
Let’s look at the Future::poll
definition again:
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
The Context
argument to poll
has a waker()
method. This method returns a Waker
bound to the current task. The Waker
has a wake()
method. Calling this method signals to the executor that the associated task should be scheduled for execution. Resources call wake()
when they transition to a ready state to notify the executor that polling the task will be able to make progress.
Updating Delay
We can update Delay
to use wakers:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
Now, once the requested duration has elapsed, the calling task is notified and the executor can ensure the task is scheduled again. The next step is to update mini-tokio to listen for wake notifications.
There are still a few remaining issues with our Delay
implementation. We will fix them later.
When a future returns
Poll::Pending
, it must ensure that the waker is signalled at some point. Forgetting to do this results in the task hanging indefinitely.Forgetting to wake a task after returning
Poll::Pending
is a common source of bugs.
Recall the first iteration of Delay
. Here was the future implementation:
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Before returning Poll::Pending
, we called cx.waker().wake_by_ref()
. This is to satisfy the future contract. By returning Poll::Pending
, we are responsible for signalling the waker. Because we didn’t implement the timer thread yet, we signalled the waker inline. Doing so will result in the future being immediately re-scheduled, executed again, and probably not be ready to complete.
Notice that you are allowed to signal the waker more often than necessary. In this particular case, we signal the waker even though we are not ready to continue the operation at all. There is nothing wrong with this besides some wasted CPU cycles. However, this particular implementation will result in a busy loop.
Updating Mini Tokio
The next step is updating Mini Tokio to receive waker notifications. We want the executor to only run tasks when they are woken, and to do this, Mini Tokio will provide its own waker. When the waker is invoked, its associated task is queued to be executed. Mini-Tokio passes this waker to the future when it polls the future.
The updated Mini Tokio will use a channel to store scheduled tasks. Channels allow tasks to be queued for execution from any thread. Wakers must be Send
and Sync
, so we use the channel from the crossbeam crate, as the standard library channel is not Sync
.
The
Send
andSync
traits are marker traits related to concurrency provided by Rust. Types that can be sent to a different thread areSend
. Most types areSend
, but something likeRc
is not. Types that can be concurrently accessed through immutable references areSync
. A type can beSend
but notSync
— a good example isCell
, which can be modified through an immutable reference, and is thus not safe to access concurrently.For more details, see the related chapter in the Rust book.
Add the following dependency to your Cargo.toml
to pull in channels.
crossbeam = "0.8"
Then, update the MiniTokio
struct.
use crossbeam::channel;
use std::sync::Arc;
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// This will be filled in soon.
}
Wakers are Sync
and can be cloned. When wake
is called, the task must be scheduled for execution. To implement this, we have a channel. When the wake()
is called on the waker, the task is pushed into the send half of the channel. Our Task
structure will implement the wake logic. To do this, it needs to contain both the spawned future and the channel send half.
use std::sync::{Arc, Mutex};
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
To schedule the task, the Arc
is cloned and sent through the channel. Now, we need to hook our schedule
function with std::task::Waker
. The standard library provides a low-level API to do this using manual vtable construction. This strategy provides maximum flexibility to implementors, but requires a bunch of unsafe boilerplate code. Instead of using RawWakerVTable
directly, we will use the ArcWake
utility provided by the futures
crate. This allows us to implement a simple trait to expose our Task
struct as a waker.
Add the following dependency to your Cargo.toml
to pull in futures
.
futures = "0.3"
Then implement futures::task::ArcWake
.
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
When the timer thread above calls waker.wake()
, the task is pushed into the channel. Next, we implement receiving and executing the tasks in the MiniTokio::run()
function.
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new taks with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
Multiple things are happening here. First, MiniTokio::run()
is implemented. The function runs in a loop receiving scheduled tasks from the channel. As tasks are pushed into the channel when they are woken, these tasks are able to make progress when executed.
Additionally, the MiniTokio::new()
and MiniTokio::spawn()
functions are adjusted to use a channel rather than a VecDeque
. When new tasks are spawned, they are given a clone of the sender-part of the channel, which the task can use to schedule itself on the runtime.
The Task::poll()
function creates the waker using the ArcWake
utility from the futures
crate. The waker is used to create a task::Context
. That task::Context
is passed to poll
.
Summary
We have now seen an end-to-end example of how asynchronous Rust works. Rust’s async/await
feature is backed by traits. This allows third-party crates, like Tokio, to provide the execution details.
- Asynchronous Rust operation are lazy and require a caller to poll them.
- Wakers are passed to futures to link a future to the task calling it.
- When a resource is not ready to complete an operation,
Poll::Pending
is returned and the task’s waker is recorded. - When the resource becomes ready, the task’s waker is notified.
- The executor receives the notification and schedules the task to execute.
- The task is polled again, this time the resource is ready and the task makes progress.
A few loose ends
Recall when we were implementing the Delay
future, we said there were a few more things to fix. Rust’s asynchronous model allows a single future to migrate across tasks while it executes. Consider the following:
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
The poll_fn
function creates a Future
instance using a closure. The snippet above creates a Delay
instance, polls it once, then sends the Delay
instance to a new task where it is awaited. In this example, Delay::poll
is called more than once with different Waker
instances. When this happens, you must make sure to call wake
on the Waker
passed to the most recent call to poll
.
When implementing a future, it is critical to assume that each call to poll
could supply a different Waker
instance. The poll function must update any previously recorded waker with the new one.
Our earlier implementation of Delay
spawned a new thread every time it was polled. This is fine, but can be very inefficient if it is polled too often (e.g. if you select!
over that future and some other future, both are polled whenever either has an event). One approach to this is to remember whether you have already spawned a thread, and only spawn a new thread if you haven’t already spawned one. However if you do this, you must ensure that the thread’s Waker
is updated on later calls to poll, as you are otherwise not waking the most recent Waker
.
To fix our earlier implementation, we could do something like this:
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
// This Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// First, if this is the first time the future is called, spawn the
// timer thread. If the timer thread is already running, ensure the
// stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// Once the waker is stored and the timer thread is started, it is
// time to check if the delay has completed. This is done by
// checking the current instant. If the duration has elapsed, then
// the future has completed and `Poll::Ready` is returned.
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// The duration has not elapsed, the future has not completed so
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
}
It is a bit involved, but the idea is, on each call to poll
, the future checks if the supplied waker matches the previously recorded waker. If the two wakers match, then there is nothing else to do. If they do not match, then the recorded waker must be updated.
Notify
utility
We demonstrated how a Delay
future could be implemented by hand using wakers. Wakers are the foundation of how asynchronous Rust works. Usually, it is not necessary to drop down to that level. For example, in the case of Delay
, we could implement it entirely with async/await
by using the tokio::sync::Notify
utility. This utility provides a basic task notification mechanism. It handles the details of wakers, including making sure that the recorded waker matches the current task.
Using Notify
, we can implement a delay
function using async/await
like this:
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}