tokio::select!
The tokio::select!
macro allows waiting on multiple async computations and returns when a single computation completes.
For example:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("one");
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
Two oneshot channels are used. Either channel could complete first. The select!
statement awaits on both channels and binds val
to the value returned by the task. When either tx1
or tx2
complete, the associated block is executed.
The branch that does not complete is dropped. In the example, the computation is awaiting the oneshot::Receiver
for each channel. The oneshot::Receiver
for the channel that did not complete yet is dropped.
Cancellation
With asynchronous Rust, cancellation is performed by dropping a future. Recall from “Async in depth”, async Rust operation are implemented using futures and futures are lazy. The operation only proceeds when the future is polled. If the future is dropped, the operation cannot proceed because all associated state has been dropped.
That said, sometimes an asynchronous operation will spawn background tasks or start other operation that run in the background. For example, in the above example, a task is spawned to send a message back. Usually, the task will perform some computation to generate the value.
Futures or other types can implement Drop
to cleanup background resources. Tokio’s oneshot::Receiver
implements Drop
by sending a closed notification to the Sender
half. The sender half can receive this notification and abort the in-progress operation by dropping it.
use tokio::sync::oneshot;
async fn some_operation() -> String {
// Compute value here
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
// Select on the operation and the oneshot's
// `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
// `some_operation()` is canceled, the
// task completes and `tx1` is dropped.
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
The Future
implementation
To help better understand how select!
works, lets look at a hypothetical Future
implementation would look like. This is a simplified version. In practice, select!
includes additional functionality like randomly selecting the branch to poll first.
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}
impl Future for MySelect {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}
if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}
Poll::Pending
}
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// use tx1 and tx2
MySelect {
rx1,
rx2,
}.await;
}
The MySelect
future contains the futures from each branch. When MySelect
is polled, the first branch is polled. If it is ready, the value is used and MySelect
completes. After .await
receives the output from a future, the future is dropped. This results in the futures for both branches to be dropped. As one branch did not complete, the operation is effectively cancelled.
Remember from the previous section:
When a future returns
Poll::Pending
, it must ensure the waker is signalled at some point in the future. Forgetting to do this results in the task hanging indefinitely.
There is no explicit usage of the Context
argument in the MySelect
implementation. Instead, the waker requirement is met by passing cx
to the inner futures. As the inner future must also meet the waker requirement, by only returning Poll::Pending
when receiving Poll::Pending
from an inner future, MySelect
also meets the waker requirement.