Broadcast Chat Application

In this exercise, we want to use our new knowledge to implement a broadcast chat application. We have a chat server that the clients connect to and publish their messages. The client reads user messages from the standard input, and sends them to the server. The chat server broadcasts each message that it receives to all the clients.

For this, we use a broadcast channel on the server, and tokio_websockets for the communication between the client and the server.

Create a new Cargo project and add the following dependencies:

Cargo.toml:

  1. [package]
  2. name = "chat-async"
  3. version = "0.1.0"
  4. edition = "2021"
  5. [dependencies]
  6. futures-util = { version = "0.3.31", features = ["sink"] }
  7. http = "1.1.0"
  8. tokio = { version = "1.41.0", features = ["full"] }
  9. tokio-websockets = { version = "0.10.1", features = ["client", "fastrand", "server", "sha1_smol"] }

The required APIs

You are going to need the following functions from tokio and tokio_websockets. Spend a few minutes to familiarize yourself with the API.

  • StreamExt::next() implemented by WebSocketStream: for asynchronously reading messages from a Websocket Stream.
  • SinkExt::send() implemented by WebSocketStream: for asynchronously sending messages on a Websocket Stream.
  • Lines::next_line(): for asynchronously reading user messages from the standard input.
  • Sender::subscribe(): for subscribing to a broadcast channel.

Two binaries

Normally in a Cargo project, you can have only one binary, and one src/main.rs file. In this project, we need two binaries. One for the client, and one for the server. You could potentially make them two separate Cargo projects, but we are going to put them in a single Cargo project with two binaries. For this to work, the client and the server code should go under src/bin (see the documentation).

Copy the following server and client code into src/bin/server.rs and src/bin/client.rs, respectively. Your task is to complete these files as described below.

src/bin/server.rs:

  1. use futures_util::sink::SinkExt;
  2. use futures_util::stream::StreamExt;
  3. use std::error::Error;
  4. use std::net::SocketAddr;
  5. use tokio::net::{TcpListener, TcpStream};
  6. use tokio::sync::broadcast::{channel, Sender};
  7. use tokio_websockets::{Message, ServerBuilder, WebSocketStream};
  8. async fn handle_connection(
  9.     addr: SocketAddr,
  10.     mut ws_stream: WebSocketStream<TcpStream>,
  11.     bcast_tx: Sender<String>,
  12. ) -> Result<(), Box<dyn Error + Send + Sync>> {
  13.     // TODO: For a hint, see the description of the task below.
  14. }
  15. #[tokio::main]
  16. async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
  17.     let (bcast_tx, _) = channel(16);
  18.     let listener = TcpListener::bind("127.0.0.1:2000").await?;
  19.     println!("listening on port 2000");
  20.     loop {
  21.         let (socket, addr) = listener.accept().await?;
  22.         println!("New connection from {addr:?}");
  23.         let bcast_tx = bcast_tx.clone();
  24.         tokio::spawn(async move {
  25.             // Wrap the raw TCP stream into a websocket.
  26.             let ws_stream = ServerBuilder::new().accept(socket).await?;
  27.             handle_connection(addr, ws_stream, bcast_tx).await
  28.         });
  29.     }
  30. }

src/bin/client.rs:

  1. use futures_util::stream::StreamExt;
  2. use futures_util::SinkExt;
  3. use http::Uri;
  4. use tokio::io::{AsyncBufReadExt, BufReader};
  5. use tokio_websockets::{ClientBuilder, Message};
  6. #[tokio::main]
  7. async fn main() -> Result<(), tokio_websockets::Error> {
  8.     let (mut ws_stream, _) =
  9.         ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
  10.             .connect()
  11.             .await?;
  12.     let stdin = tokio::io::stdin();
  13.     let mut stdin = BufReader::new(stdin).lines();
  14.     // TODO: For a hint, see the description of the task below.
  15. }

Running the binaries

Run the server with:

  1. cargo run --bin server

and the client with:

  1. cargo run --bin client

Tasks

  • Implement the handle_connection function in src/bin/server.rs.
    • Hint: Use tokio::select! for concurrently performing two tasks in a continuous loop. One task receives messages from the client and broadcasts them. The other sends messages received by the server to the client.
  • Complete the main function in src/bin/client.rs.
    • Hint: As before, use tokio::select! in a continuous loop for concurrently performing two tasks: (1) reading user messages from standard input and sending them to the server, and (2) receiving messages from the server, and displaying them for the user.
  • Optional: Once you are done, change the code to broadcast messages to all clients, but the sender of the message.