Example: A Chat Server
We’re going to use what has been covered so far to build a chat server. This isa non-trivial Tokio server application.
The server is going to use a line-based protocol. Lines are terminated by\r\n
. This is compatible with telnet, so we will just use telnet for theclient. When a client connects, it must identify itself by sending a linecontaining its “nick” (i.e., some name used to identify the client amongst itspeers).
Once a client is identified, all sent lines are prefixed with [nick]:
andbroadcasted to all other connected clients.
The full code can be found here. Note that Tokio provides some additionalabstractions that have not yet been covered that would enable the chat server tobe written with less code.
Setup
First, generate a new crate.
$ cargo new --bin line-chat
cd line-chat
Next, add the necessary dependencies:
[dependencies]
tokio = "0.1"
tokio-io = "0.1"
futures = "0.1"
bytes = "0.4"
and the crates and types into scope in main.rs
:
# #![deny(deprecated)]
extern crate tokio;
#[macro_use]
extern crate futures;
extern crate bytes;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;
/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<Bytes>;
# fn main() {}
Now, we setup the necessary structure for a server. These are the same stepsthat were used as part of the Hello World! example:
- Bind a
TcpListener
to a local port. - Define a task that accepts inbound connections and processes them.
- Start the Tokio runtime
- Spawn the server task.
Again, no work actually happens until the server task is spawned on theexecutor.
# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
#
# use tokio::prelude::*;
# use tokio::net::TcpListener;
fn main() {
let addr = "127.0.0.1:6142".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(move |socket| {
// TODO: Process socket
Ok(())
})
.map_err(|err| {
// Handle error by printing to STDOUT.
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
# let server = server.select(futures::future::ok(())).then(|_| Ok(()));
// Start the server
//
// This does a few things:
//
// * Start the Tokio runtime (reactor, threadpool, etc...)
// * Spawns the `server` task onto the runtime.
// * Blocks the current thread until the runtime becomes idle, i.e. all
// spawned tasks have completed.
tokio::run(server);
}
Chat State
A chat server requires that messages received from one client are broadcasted toall other connected clients. This will be done using message passing overmpsc channels.
Each client socket will be managed by a task. Each task will have an associatedmpsc channel that is used to receive messages from other clients. The sendhalf of all these channels is stored in an Rc
cell in order to make themaccessible.
In this example, we are going to be using unbounded channels. Ideally,channels should never be unbounded, but handling backpressure in this kind ofsituation is a bit tricky. We will leave bounding the channels to a latersection dedicated to handling backpressure.
Here is how the shared state is defined (the Tx
type alias was done above):
# #![deny(deprecated)]
# use std::collections::HashMap;
# use std::net::SocketAddr;
# struct Tx;
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
Then, at the very top of the main
function, the state instance is created.This state instance will be moved into the task that accepts incomingconnections.
# #![deny(deprecated)]
# use std::sync::{Arc, Mutex};
# type Shared = String;
let state = Arc::new(Mutex::new(Shared::new()));
Now we can handle processing incoming connections. The server task is updated tothis:
# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
# use tokio::net::{TcpListener, TcpStream};
# use futures::prelude::*;
# fn dox() {
# let addr = "127.0.0.1:6142".parse().unwrap();
# let listener = TcpListener::bind(&addr).unwrap();
# fn process(_: TcpStream, _: String) {}
# let state = String::new();
listener.incoming().for_each(move |socket| {
process(socket, state.clone());
Ok(())
})
# ;
# }
# fn main() {}
The server task passes all sockets along with a clone of the server state to aprocess
function. Let’s define that function. It will have a structure likethis:
# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
# use futures::future;
# use tokio::net::TcpStream;
# use std::sync::{Arc, Mutex};
# type Shared = String;
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Define the task that processes the connection.
# /*
let task = unimplemented!();
# */ let task = future::ok(());
// Spawn the task
tokio::spawn(task);
}
# fn main() {}
The call to tokio::spawn
will spawn a new task onto the current Tokio runtime.All the worker threads keep a reference to the current runtime stored in athread-local variable. Note, attempting to call tokio::spawn
from outside ofthe Tokio runtime will result in a panic.
All the connection processing logic has to be able to do is understand theprotocol. The protocol is line-based, terminated by \r\n
. Instead of workingat the byte stream level, it is much easier to work at the frame level, i.e.working with values that represent atomic messages.
We implement a codec that holds the socket and exposes an API that takes andconsumes lines.
Line Codec
A codec is a loose term for a type that takes a byte stream type (AsyncRead +
) and exposes a read and write API at the frame level. The
AsyncWritetokio-io
crate provides additional helpers for writing codecs, in thisexample, we are going to do it by hand.
The Lines
codec is defined as such:
# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# use tokio::net::TcpStream;
# use bytes::BytesMut;
struct Lines {
socket: TcpStream,
rd: BytesMut,
wr: BytesMut,
}
impl Lines {
/// Create a new `Lines` codec backed by the socket
fn new(socket: TcpStream) -> Self {
Lines {
socket,
rd: BytesMut::new(),
wr: BytesMut::new(),
}
}
}
# fn main() {}
Data read from the socket is buffered into rd
. When a full line is read, it isreturned to the caller. Lines submitted by the caller to write to the socket arebuffered into wr
, then flushed.
This is how the read half is implemented:
# #![deny(deprecated)]
# extern crate bytes;
# extern crate tokio;
# #[macro_use]
# extern crate futures;
# #[macro_use]
# use bytes::BytesMut;
# use tokio::io;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# struct Lines {
# socket: TcpStream,
# rd: BytesMut,
# wr: BytesMut,
# }
impl Stream for Lines {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
// First, read any new data that might have been received
// off the socket
//
// We track if the socket is closed here and will be used
// to inform the return value below.
let sock_closed = self.fill_read_buf()?.is_ready();
// Now, try finding lines
let pos = self.rd.windows(2)
.position(|bytes| bytes == b"\r\n");
if let Some(pos) = pos {
// Remove the line from the read buffer and set it
// to `line`.
let mut line = self.rd.split_to(pos + 2);
// Drop the trailing \r\n
line.split_off(pos);
// Return the line
return Ok(Async::Ready(Some(line)));
}
if sock_closed {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
}
impl Lines {
fn fill_read_buf(&mut self) -> Result<Async<()>, io::Error> {
loop {
// Ensure the read buffer has capacity.
//
// This might result in an internal allocation.
self.rd.reserve(1024);
// Read data into the buffer.
//
// The `read_buf` fn is provided by `AsyncRead`.
let n = try_ready!(self.socket.read_buf(&mut self.rd));
if n == 0 {
return Ok(Async::Ready(()));
}
}
}
}
# fn main() {}
The example uses BytesMut
from the bytes
crate. This provides some niceutilities for working with byte sequences in a networking context. TheStream
implementation yields BytesMut
values which contain exactly oneline.
As always, the key to implementing a function that returns Async
is to neverreturn Async::NotReady
unless the function implementation received anAsync::NotReady
itself. In this example, NotReady
is only returned iffill_read_buf
returns NotReady
and fill_read_buf
only returns NotReady
if TcpStream::read_buf
returns NotReady
.
Now, for the write half.
# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# #[macro_use]
# extern crate futures;
# use tokio::io;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use bytes::{BytesMut, BufMut};
struct Lines {
socket: TcpStream,
rd: BytesMut,
wr: BytesMut,
}
impl Lines {
fn buffer(&mut self, line: &[u8]) {
// Push the line onto the end of the write buffer.
//
// The `put` function is from the `BufMut` trait.
self.wr.put(line);
}
fn poll_flush(&mut self) -> Poll<(), io::Error> {
// As long as there is buffered data to write, try to write it.
while !self.wr.is_empty() {
// Try to write some bytes to the socket
let n = try_ready!(self.socket.poll_write(&self.wr));
// As long as the wr is not empty, a successful write should
// never write 0 bytes.
assert!(n > 0);
// This discards the first `n` bytes of the buffer.
let _ = self.wr.split_to(n);
}
Ok(Async::Ready(()))
}
}
fn main() {}
The caller queues up all lines by calling buffer
. This appends the line to theinternal wr
buffer. Then, once all data is queued up, the caller callspoll_flush
, which does the actual writing to the socket. poll_flush
onlyreturns Ready
once all the queued data has been successfully written to thesocket.
Similar to the read half, NotReady
is only returned when the functionimplementation received NotReady
itself.
And the Lines
codec is used in the process
function as such:
# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use bytes::BytesMut;
# use std::io;
# use std::sync::{Arc, Mutex};
# type Shared = String;
# struct Lines;
# impl Lines {
# fn new(_: TcpStream) -> Self { unimplemented!() }
# }
# impl Stream for Lines {
# type Item = BytesMut;
# type Error = io::Error;
# fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { unimplemented!() }
# }
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Wrap the socket with the `Lines` codec that we wrote above.
let lines = Lines::new(socket);
// The first line is treated as the client's name. The client
// is not added to the set of connected peers until this line
// is received.
//
// We use the `into_future` combinator to extract the first
// item from the lines stream. `into_future` takes a `Stream`
// and converts it to a future of `(first, rest)` where `rest`
// is the original stream instance.
let connection = lines.into_future()
// `into_future` doesn't have the right error type, so map
// the error to make it work.
.map_err(|(e, _)| e)
// Process the first received line as the client's name.
.and_then(|(name, lines)| {
let name = match name {
Some(name) => name,
None => {
// TODO: Handle a client that disconnects
// early.
unimplemented!();
}
};
// TODO: Rest of the process function
# Ok(())
});
}
# fn main() {}
Broadcasting Messages
The next step is to implement the connection processing logic that handles theactual chat functionality, i.e. broadcasting messages from one client to all theothers.
To implement this, we will explicitly implement a Future
that takes theLines
codec instance and handles the broadcasting logic. This logic handles:
- Receive messages on its message channel and write them to the socket.
- Receive messages from the socket and broadcast them to all peers.
Implementing this logic entirely with combinators is also possible, but requiresusingsplit
, which hasn’t been covered yet. Also, this provides anopportunity to see how to implement a non-trivialFuture
by hand.
Here is the definition of the future that processes the broadcast logic for aconnection:
# use std::net::SocketAddr;
# use std::sync::{Arc, Mutex};
# type BytesMut = ();
# type Lines = ();
# type Shared = ();
# type Rx = ();
struct Peer {
/// Name of the peer. This is the first line received from the client.
name: BytesMut,
/// The TCP socket wrapped with the `Lines` codec.
lines: Lines,
/// Handle to the shared chat state.
state: Arc<Mutex<Shared>>,
/// Receive half of the message channel.
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Rx,
/// Client socket address.
///
/// The socket address is used as the key in the `peers` HashMap. The
/// address is saved so that the `Peer` drop implementation can clean up its
/// entry.
addr: SocketAddr,
}
# fn main() {}
And a Peer
instance is created as such:
# extern crate bytes;
# extern crate futures;
# extern crate tokio;
# use bytes::{BytesMut, Bytes};
# use futures::sync::mpsc;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
# name: BytesMut,
# lines: Lines,
# state: Arc<Mutex<Shared>>,
# rx: Rx,
# addr: SocketAddr,
# }
# struct Shared {
# peers: HashMap<SocketAddr, Tx>,
# }
# struct Lines {
# socket: TcpStream,
# }
# type Tx = mpsc::UnboundedSender<Bytes>;
# type Rx = mpsc::UnboundedReceiver<Bytes>;
impl Peer {
fn new(name: BytesMut,
state: Arc<Mutex<Shared>>,
lines: Lines) -> Peer
{
// Get the client socket address
let addr = lines.socket.peer_addr().unwrap();
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
state.lock().unwrap()
.peers.insert(addr, tx);
Peer {
name,
lines,
state,
rx,
addr,
}
}
}
# fn main() {}
A mpsc channel is created for other peers to send their messages to thisnewly created peer. After creating the channel, the transmit half is insertedinto the peers map. This entry is removed in the drop implementation forPeer
.
# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
# state: Arc<Mutex<Shared>>,
# addr: SocketAddr,
# }
# struct Shared {
# peers: HashMap<SocketAddr, ()>,
# }
impl Drop for Peer {
fn drop(&mut self) {
self.state.lock().unwrap().peers
.remove(&self.addr);
}
}
# fn main() {}
And here is the implementation.
# extern crate tokio;
# extern crate futures;
# extern crate bytes;
# use tokio::io;
# use tokio::prelude::*;
# use futures::sync::mpsc;
# use bytes::{Bytes, BytesMut, BufMut};
# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
# name: BytesMut,
# lines: Lines,
# state: Arc<Mutex<Shared>>,
# rx: Rx,
# addr: SocketAddr,
# }
# struct Shared {
# peers: HashMap<SocketAddr, Tx>,
# }
# struct Lines;
# type Tx = mpsc::UnboundedSender<Bytes>;
# type Rx = mpsc::UnboundedReceiver<Bytes>;
# impl Lines {
# fn buffer(&mut self, _: &[u8]) { unimplemented!() }
# fn poll_flush(&mut self) -> Poll<(), io::Error> { unimplemented!() }
# }
# impl Stream for Lines {
# type Item = BytesMut;
# type Error = io::Error;
# fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
# unimplemented!();
# }
# }
impl Future for Peer {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
// Receive all messages from peers.
loop {
// Polling an `UnboundedReceiver` cannot fail, so `unwrap`
// here is safe.
match self.rx.poll().unwrap() {
Async::Ready(Some(v)) => {
// Buffer the line. Once all lines are buffered,
// they will be flushed to the socket (right
// below).
self.lines.buffer(&v);
}
_ => break,
}
}
// Flush the write buffer to the socket
let _ = self.lines.poll_flush()?;
// Read new lines from the socket
while let Async::Ready(line) = self.lines.poll()? {
println!("Received line ({:?}) : {:?}", self.name, line);
if let Some(message) = line {
// Append the peer's name to the front of the line:
let mut line = self.name.clone();
line.put(": ");
line.put(&message);
line.put("\r\n");
// We're using `Bytes`, which allows zero-copy clones
// (by storing the data in an Arc internally).
//
// However, before cloning, we must freeze the data.
// This converts it from mutable -> immutable,
// allowing zero copy cloning.
let line = line.freeze();
// Now, send the line to all other peers
for (addr, tx) in &self.state.lock().unwrap().peers {
// Don't send the message to ourselves
if *addr != self.addr {
// The send only fails if the rx half has been
// dropped, however this is impossible as the
// `tx` half will be removed from the map
// before the `rx` is dropped.
tx.unbounded_send(line.clone()).unwrap();
}
}
} else {
// EOF was reached. The remote client has disconnected.
// There is nothing more to do.
return Ok(Async::Ready(()));
}
}
// As always, it is important to not just return `NotReady`
// without ensuring an inner future also returned `NotReady`.
//
// We know we got a `NotReady` from either `self.rx` or
// `self.lines`, so the contract is respected.
Ok(Async::NotReady)
}
}
# fn main() {}
Final Touches
All that remains is wiring up the Peer
future that was just implemented. To dothis, the client connection task (defined in the process
function) is extendedto use Peer
.
# extern crate tokio;
# extern crate futures;
# use tokio::io;
# use tokio::prelude::*;
# use futures::future::{self, Either, empty};
# type Lines = Box<Stream<Item = (), Error = io::Error>>;
# struct Peer;
# impl Peer {
# fn new(_: (), state: (), lines: Lines) -> impl Future<Item = (), Error = io::Error> {
# empty()
# }
# }
# fn dox(lines: Lines) {
# let state = ();
let connection = lines.into_future()
.map_err(|(e, _)| e)
.and_then(|(name, lines)| {
// If `name` is `None`, then the client disconnected without
// actually sending a line of data.
//
// Since the connection is closed, there is no further work
// that we need to do. So, we just terminate processing by
// returning `future::ok()`.
//
// The problem is that only a single future type can be
// returned from a combinator closure, but we want to
// return both `future::ok()` and `Peer` (below).
//
// This is a common problem, so the `futures` crate solves
// this by providing the `Either` helper enum that allows
// creating a single return type that covers two concrete
// future types.
let name = match name {
Some(name) => name,
None => {
// The remote client closed the connection without
// sending any data.
return Either::A(future::ok(()));
}
};
println!("`{:?}` is joining the chat", name);
// Create the peer.
//
// This is also a future that processes the connection, only
// completing when the socket closes.
let peer = Peer::new(
name,
state,
lines);
// Wrap `peer` with `Either::B` to make the return type fit.
Either::B(peer)
})
// Task futures have an error of type `()`, this ensures we handle
// the error. We do this by printing the error to STDOUT.
.map_err(|e| {
println!("connection error = {:?}", e);
});
# }
# fn main() {}
Besides just adding Peer
, name == None
is also handled. In this case, theremote client terminated before identifying itself.
Returning multiple futures (the name == None
handler and Peer
) is handled bywrapping the returned futures in Either
. Either
is an enum that acceptsa different future type for each variant. This allows returning multiple futuretypes without reaching for trait objects.
The full code can be found here.
Next up: Timers