Async Channels

Several crates have support for asynchronous channels. For instance tokio:

  1. use tokio::sync::mpsc;
  2. async fn ping_handler(mut input: mpsc::Receiver<()>) {
  3.     let mut count: usize = 0;
  4.     while let Some(_) = input.recv().await {
  5.         count += 1;
  6.         println!("Received {count} pings so far.");
  7.     }
  8.     println!("ping_handler complete");
  9. }
  10. #[tokio::main]
  11. async fn main() {
  12.     let (sender, receiver) = mpsc::channel(32);
  13.     let ping_handler_task = tokio::spawn(ping_handler(receiver));
  14.     for i in 0..10 {
  15.         sender.send(()).await.expect("Failed to send ping.");
  16.         println!("Sent {} pings so far.", i + 1);
  17.     }
  18.     drop(sender);
  19.     ping_handler_task.await.expect("Something went wrong in ping handler task.");
  20. }

This slide should take about 8 minutes.

  • Change the channel size to 3 and see how it affects the execution.

  • Overall, the interface is similar to the sync channels as seen in the morning class.

  • Try removing the std::mem::drop call. What happens? Why?

  • The Flume crate has channels that implement both sync and async send and recv. This can be convenient for complex applications with both IO and heavy CPU processing tasks.

  • What makes working with async channels preferable is the ability to combine them with other futures to combine them and create complex control flow.