Framing
We will now apply what we just learned about I/O and implement the Mini-Redis framing layer. Framing is the process of taking a byte stream and converting it to a stream of frames. A frame is a unit of data transmitted between two peers. The Redis protocol frame is defined as follows:
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
Note how the frame only consists of data without any semantics. The command parsing and implementation happen at a higher level.
For HTTP, a frame might look like:
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
To implement framing for Mini-Redis, we will implement a Connection
struct that wraps a TcpStream
and reads/writes mini_redis::Frame
values.
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... other fields here
}
impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}
/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}
You can find the details of the Redis wire protocol here. The full Connection
code is found here.
Buffered reads
The read_frame
method waits for an entire frame to be received before returning. A single call to TcpStream::read()
may return an arbitrary amount of data. It could contain an entire frame, a partial frame, or multiple frames. If a partial frame is received, the data is buffered and more data is read from the socket. If multiple frames are received, the first frame is returned and the rest of the data is buffered until the next call to read_frame
.
To implement this, Connection
needs a read buffer field. Data is read from the socket into the read buffer. When a frame is parsed, the corresponding data is removed from the buffer.
We will use BytesMut
as the buffer type. This is a mutable version of Bytes
.
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}
Next, we implement the read_frame()
method.
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// Attempt to parse a frame from the buffered data. If
// enough data has been buffered, the frame is
// returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// There is not enough buffered data to read a frame.
// Attempt to read more data from the socket.
//
// On success, the number of bytes is returned. `0`
// indicates "end of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
Let’s break this down. The read_frame
method operates in a loop. First, self.parse_frame()
is called. This will attempt to parse a redis frame from self.buffer
. If there is enough data to parse a frame, the frame is returned to the caller of read_frame()
.Otherwise, we attempt to read more data from the socket into the buffer. After reading more data, parse_frame()
is called again. This time, if enough data has been received, parsing may succeed.
When reading from the stream, a return value of 0
indicates that no more data will be received from the peer. If the read buffer still has data in it, this indicates a partial frame has been received and the connection is being terminated abruptly. This is an error condition and Err
is returned.
The Buf
trait
When reading from the stream, read_buf
is called. This version of the read function takes a value implementing BufMut
from the bytes
crate.
First, consider how we would implement the same read loop using read()
. Vec<u8>
could be used instead of BytesMut
.
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: vec![0; 4096],
cursor: 0,
}
}
}
And the read_frame()
function on Connection
:
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}
When working with byte arrays and read
, we must also maintain a cursor tracking how much data has been buffered. We must make sure to pass the empty portion of the buffer to read()
. Otherwise, we would overwrite buffered data. If our buffer gets filled up, we must grow the buffer in order to keep reading. In parse_frame()
(not included), we would need to parse data contained by self.buffer[..self.cursor]
.
Because pairing a byte array with a cursor is very common, the bytes
crate provides an abstraction representing a byte array and cursor. The Buf
trait is implemented by types from which data can be read. The BufMut
trait is implemented by types into which data can be written. When passing a T: BufMut
to read_buf()
, the buffer’s internal cursor is automatically updated by read_buf
. Because of this, in our version of read_frame
, we do not need to manage our own cursor.
Additionally, when using Vec<u8>
, the buffer must be initialized. vec![0; 4096]
allocates an array of 4096 bytes and writes zero to every entry. When resizing the buffer, the new capacity must also be initialized with zeros. The initialization process is not free. When working with BytesMut
and BufMut
, capacity is uninitialized. The BytesMut
abstraction prevents us from reading the uninitialized memory. This lets us avoid the initialization step.
Parsing
Now, let’s look at the parse_frame()
function. Parsing is done in two steps.
- Ensure a full frame is buffered and find the end index of the frame.
- Parse the frame.
The mini-redis
crate provides us with a function for both of these steps:
We will also reuse the Buf
abstraction to help. A Buf
is passed into Frame::check
. As the check
function iterates the passed in buffer, the internal cursor will be advanced. When check
returns, the buffer’s internal cursor points to the end of the frame.
For the Buf
type, we will use std::io::Cursor<&[u8]>
.
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
The full Frame::check
function can be found here. We will not cover it in its entirety.
The relevant thing to note is that Buf
‘s “byte iterator” style APIs are used. These fetch data and advance the internal cursor. For example, to parse a frame, the first byte is checked to determine the type of the frame. The function used is Buf::get_u8
. This fetches the byte at the current cursor’s position and advances the cursor by one.
There are more useful methods on the Buf
trait. Check the API docs for more details.
Buffered writes
The other half of the framing API is the write_frame(frame)
function. This function writes an entire frame to the socket. In order to minimize write
syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame()
, the entire frame is not always buffered to a byte array before writing to the socket.
Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes)
. The wire format of a bulk frame is a frame head, which consists of the $
character followed by the data length in bytes. The majority of the frame is the contents of the Bytes
value. If the data is large, copying it to an intermediate buffer would be costly.
To implement buffered writes, we will use the BufWriter
struct. This struct is initialized with a T: AsyncWrite
and implements AsyncWrite
itself. When write
is called on BufWriter
, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.
We will not attempt a full implementation of write_frame()
as part of the tutorial. See the full implementation here.
First, the Connection
struct is updated:
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
Next, write_frame()
is implemented.
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
The functions used here are provided by AsyncWriteExt
. They are available on TcpStream
as well, but it would not be advisable to issue single byte writes without the intermediate buffer.
write_u8
writes a single byte to the writer.write_all
writes the entire slice to the writer.write_decimal
is implemented by mini-redis.
The function ends with a call to self.stream.flush().await
. Because BufWriter
stores writes in an intermediate buffer, calls to write
do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush()
writes any data pending in the buffer to the socket.
Another alternative would be to not call flush()
in write_frame()
. Instead, provide a flush()
function on Connection
. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write
syscall. Doing this complicates the Connection
API. Simplicity is one of Mini-Redis’ goals, so we decided to include the flush().await
call in fn write_frame()
.