并发编程:下午练习

Dining Philosophers —- Async

(返回练习)

  1. use std::sync::Arc;
  2. use tokio::sync::mpsc::{self, Sender};
  3. use tokio::sync::Mutex;
  4. use tokio::time;
  5. struct Fork;
  6. struct Philosopher {
  7. name: String,
  8. left_fork: Arc<Mutex<Fork>>,
  9. right_fork: Arc<Mutex<Fork>>,
  10. thoughts: Sender<String>,
  11. }
  12. impl Philosopher {
  13. async fn think(&self) {
  14. self.thoughts
  15. .send(format!("Eureka! {} has a new idea!", &self.name))
  16. .await
  17. .unwrap();
  18. }
  19. async fn eat(&self) {
  20. // Keep trying until we have both forks
  21. let (_left_fork, _right_fork) = loop {
  22. // Pick up forks...
  23. let left_fork = self.left_fork.try_lock();
  24. let right_fork = self.right_fork.try_lock();
  25. let Ok(left_fork) = left_fork else {
  26. // If we didn't get the left fork, drop the right fork if we
  27. // have it and let other tasks make progress.
  28. drop(right_fork);
  29. time::sleep(time::Duration::from_millis(1)).await;
  30. continue;
  31. };
  32. let Ok(right_fork) = right_fork else {
  33. // If we didn't get the right fork, drop the left fork and let
  34. // other tasks make progress.
  35. drop(left_fork);
  36. time::sleep(time::Duration::from_millis(1)).await;
  37. continue;
  38. };
  39. break (left_fork, right_fork);
  40. };
  41. println!("{} is eating...", &self.name);
  42. time::sleep(time::Duration::from_millis(5)).await;
  43. // The locks are dropped here
  44. }
  45. }
  46. static PHILOSOPHERS: &[&str] =
  47. &["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
  48. #[tokio::main]
  49. async fn main() {
  50. // Create forks
  51. let mut forks = vec![];
  52. (0..PHILOSOPHERS.len()).for_each(|_| forks.push(Arc::new(Mutex::new(Fork))));
  53. // Create philosophers
  54. let (philosophers, mut rx) = {
  55. let mut philosophers = vec![];
  56. let (tx, rx) = mpsc::channel(10);
  57. for (i, name) in PHILOSOPHERS.iter().enumerate() {
  58. let left_fork = Arc::clone(&forks[i]);
  59. let right_fork = Arc::clone(&forks[(i + 1) % PHILOSOPHERS.len()]);
  60. philosophers.push(Philosopher {
  61. name: name.to_string(),
  62. left_fork,
  63. right_fork,
  64. thoughts: tx.clone(),
  65. });
  66. }
  67. (philosophers, rx)
  68. // tx is dropped here, so we don't need to explicitly drop it later
  69. };
  70. // Make them think and eat
  71. for phil in philosophers {
  72. tokio::spawn(async move {
  73. for _ in 0..100 {
  74. phil.think().await;
  75. phil.eat().await;
  76. }
  77. });
  78. }
  79. // Output their thoughts
  80. while let Some(thought) = rx.recv().await {
  81. println!("Here is a thought: {thought}");
  82. }
  83. }

广播聊天应用

(返回练习)

src/bin/server.rs:

  1. use futures_util::sink::SinkExt;
  2. use futures_util::stream::StreamExt;
  3. use std::error::Error;
  4. use std::net::SocketAddr;
  5. use tokio::net::{TcpListener, TcpStream};
  6. use tokio::sync::broadcast::{channel, Sender};
  7. use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
  8. async fn handle_connection(
  9. addr: SocketAddr,
  10. mut ws_stream: WebSocketStream<TcpStream>,
  11. bcast_tx: Sender<String>,
  12. ) -> Result<(), Box<dyn Error + Send + Sync>> {
  13. ws_stream
  14. .send(Message::text("Welcome to chat! Type a message".to_string()))
  15. .await?;
  16. let mut bcast_rx = bcast_tx.subscribe();
  17. // A continuous loop for concurrently performing two tasks: (1) receiving
  18. // messages from `ws_stream` and broadcasting them, and (2) receiving
  19. // messages on `bcast_rx` and sending them to the client.
  20. loop {
  21. tokio::select! {
  22. incoming = ws_stream.next() => {
  23. match incoming {
  24. Some(Ok(msg)) => {
  25. if let Some(text) = msg.as_text() {
  26. println!("From client {addr:?} {text:?}");
  27. bcast_tx.send(text.into())?;
  28. }
  29. }
  30. Some(Err(err)) => return Err(err.into()),
  31. None => return Ok(()),
  32. }
  33. }
  34. msg = bcast_rx.recv() => {
  35. ws_stream.send(Message::text(msg?)).await?;
  36. }
  37. }
  38. }
  39. }
  40. #[tokio::main]
  41. async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
  42. let (bcast_tx, _) = channel(16);
  43. let listener = TcpListener::bind("127.0.0.1:2000").await?;
  44. println!("listening on port 2000");
  45. loop {
  46. let (socket, addr) = listener.accept().await?;
  47. println!("New connection from {addr:?}");
  48. let bcast_tx = bcast_tx.clone();
  49. tokio::spawn(async move {
  50. // Wrap the raw TCP stream into a websocket.
  51. let ws_stream = ServerBuilder::new().accept(socket).await?;
  52. handle_connection(addr, ws_stream, bcast_tx).await
  53. });
  54. }
  55. }

src/bin/client.rs:

  1. use futures_util::stream::StreamExt;
  2. use futures_util::SinkExt;
  3. use http::Uri;
  4. use tokio::io::{AsyncBufReadExt, BufReader};
  5. use tokio_websockets::{ClientBuilder, Message};
  6. #[tokio::main]
  7. async fn main() -> Result<(), tokio_websockets::Error> {
  8. let (mut ws_stream, _) =
  9. ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
  10. .connect()
  11. .await?;
  12. let stdin = tokio::io::stdin();
  13. let mut stdin = BufReader::new(stdin).lines();
  14. // Continuous loop for concurrently sending and receiving messages.
  15. loop {
  16. tokio::select! {
  17. incoming = ws_stream.next() => {
  18. match incoming {
  19. Some(Ok(msg)) => {
  20. if let Some(text) = msg.as_text() {
  21. println!("From server: {}", text);
  22. }
  23. },
  24. Some(Err(err)) => return Err(err.into()),
  25. None => return Ok(()),
  26. }
  27. }
  28. res = stdin.next_line() => {
  29. match res {
  30. Ok(None) => return Ok(()),
  31. Ok(Some(line)) => ws_stream.send(Message::text(line.to_string())).await?,
  32. Err(err) => return Err(err.into()),
  33. }
  34. }
  35. }
  36. }
  37. }