Solutions

Dining Philosophers —- Async

  1. use std::sync::Arc;
  2. use tokio::sync::{mpsc, Mutex};
  3. use tokio::time;
  4. struct Fork;
  5. struct Philosopher {
  6.     name: String,
  7.     left_fork: Arc<Mutex<Fork>>,
  8.     right_fork: Arc<Mutex<Fork>>,
  9.     thoughts: mpsc::Sender<String>,
  10. }
  11. impl Philosopher {
  12.     async fn think(&self) {
  13.         self.thoughts
  14.             .send(format!("Eureka! {} has a new idea!", &self.name))
  15.             .await
  16.             .unwrap();
  17.     }
  18.     async fn eat(&self) {
  19.         // Keep trying until we have both forks
  20.         // Pick up forks...
  21.         let _left_fork = self.left_fork.lock().await;
  22.         let _right_fork = self.right_fork.lock().await;
  23.         println!("{} is eating...", &self.name);
  24.         time::sleep(time::Duration::from_millis(5)).await;
  25.         // The locks are dropped here
  26.     }
  27. }
  28. static PHILOSOPHERS: &[&str] =
  29.     &["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
  30. #[tokio::main]
  31. async fn main() {
  32.     // Create forks
  33.     let mut forks = vec![];
  34.     (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
  35.     // Create philosophers
  36.     let (philosophers, mut rx) = {
  37.         let mut philosophers = vec![];
  38.         let (tx, rx) = mpsc::channel(10);
  39.         for (i, name) in PHILOSOPHERS.iter().enumerate() {
  40.             let mut left_fork = Arc::clone(&forks[i]);
  41.             let mut right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
  42.             if i == PHILOSOPHERS.len() - 1 {
  43.                 std::mem::swap(&mut left_fork, &mut right_fork);
  44.             }
  45.             philosophers.push(Philosopher {
  46.                 name: name.to_string(),
  47.                 left_fork,
  48.                 right_fork,
  49.                 thoughts: tx.clone(),
  50.             });
  51.         }
  52.         (philosophers, rx)
  53.         // tx is dropped here, so we don't need to explicitly drop it later
  54.     };
  55.     // Make them think and eat
  56.     for phil in philosophers {
  57.         tokio::spawn(async move {
  58.             for _ in 0..100 {
  59.                 phil.think().await;
  60.                 phil.eat().await;
  61.             }
  62.         });
  63.     }
  64.     // Output their thoughts
  65.     while let Some(thought) = rx.recv().await {
  66.         println!("Here is a thought: {thought}");
  67.     }
  68. }

Broadcast Chat Application

src/bin/server.rs:

  1. use futures_util::sink::SinkExt;
  2. use futures_util::stream::StreamExt;
  3. use std::error::Error;
  4. use std::net::SocketAddr;
  5. use tokio::net::{TcpListener, TcpStream};
  6. use tokio::sync::broadcast::{channel, Sender};
  7. use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
  8. async fn handle_connection(
  9.     addr: SocketAddr,
  10.     mut ws_stream: WebSocketStream<TcpStream>,
  11.     bcast_tx: Sender<String>,
  12. ) -> Result<(), Box<dyn Error + Send + Sync>> {
  13.     ws_stream
  14.         .send(Message::text("Welcome to chat! Type a message".to_string()))
  15.         .await?;
  16.     let mut bcast_rx = bcast_tx.subscribe();
  17.     // A continuous loop for concurrently performing two tasks: (1) receiving
  18.     // messages from `ws_stream` and broadcasting them, and (2) receiving
  19.     // messages on `bcast_rx` and sending them to the client.
  20.     loop {
  21.         tokio::select! {
  22.             incoming = ws_stream.next() => {
  23.                 match incoming {
  24.                     Some(Ok(msg)) => {
  25.                         if let Some(text) = msg.as_text() {
  26.                             println!("From client {addr:?} {text:?}");
  27.                             bcast_tx.send(text.into())?;
  28.                         }
  29.                     }
  30.                     Some(Err(err)) => return Err(err.into()),
  31.                     None => return Ok(()),
  32.                 }
  33.             }
  34.             msg = bcast_rx.recv() => {
  35.                 ws_stream.send(Message::text(msg?)).await?;
  36.             }
  37.         }
  38.     }
  39. }
  40. #[tokio::main]
  41. async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
  42.     let (bcast_tx, _) = channel(16);
  43.     let listener = TcpListener::bind("127.0.0.1:2000").await?;
  44.     println!("listening on port 2000");
  45.     loop {
  46.         let (socket, addr) = listener.accept().await?;
  47.         println!("New connection from {addr:?}");
  48.         let bcast_tx = bcast_tx.clone();
  49.         tokio::spawn(async move {
  50.             // Wrap the raw TCP stream into a websocket.
  51.             let ws_stream = ServerBuilder::new().accept(socket).await?;
  52.             handle_connection(addr, ws_stream, bcast_tx).await
  53.         });
  54.     }
  55. }

src/bin/client.rs:

  1. use futures_util::stream::StreamExt;
  2. use futures_util::SinkExt;
  3. use http::Uri;
  4. use tokio::io::{AsyncBufReadExt, BufReader};
  5. use tokio_websockets::{ClientBuilder, Message};
  6. #[tokio::main]
  7. async fn main() -> Result<(), tokio_websockets::Error> {
  8.     let (mut ws_stream, _) =
  9.         ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
  10.             .connect()
  11.             .await?;
  12.     let stdin = tokio::io::stdin();
  13.     let mut stdin = BufReader::new(stdin).lines();
  14.     // Continuous loop for concurrently sending and receiving messages.
  15.     loop {
  16.         tokio::select! {
  17.             incoming = ws_stream.next() => {
  18.                 match incoming {
  19.                     Some(Ok(msg)) => {
  20.                         if let Some(text) = msg.as_text() {
  21.                             println!("From server: {}", text);
  22.                         }
  23.                     },
  24.                     Some(Err(err)) => return Err(err.into()),
  25.                     None => return Ok(()),
  26.                 }
  27.             }
  28.             res = stdin.next_line() => {
  29.                 match res {
  30.                     Ok(None) => return Ok(()),
  31.                     Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
  32.                     Err(err) => return Err(err.into()),
  33.                 }
  34.             }
  35.         }
  36.     }
  37. }