Buffered writes
The other half of the framing API is the write_frame(frame)
function. This function writes an entire frame to the socket. In order to minimize write
syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame()
, the entire frame is not always buffered to a byte array before writing to the socket.
Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes)
. The wire format of a bulk frame is a frame head, which consists of the $
character followed by the data length in bytes. The majority of the frame is the contents of the Bytes
value. If the data is large, copying it to an intermediate buffer would be costly.
To implement buffered writes, we will use the BufWriter
struct. This struct is initialized with a T: AsyncWrite
and implements AsyncWrite
itself. When write
is called on BufWriter
, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.
We will not attempt a full implementation of write_frame()
as part of the tutorial. See the full implementation here.
First, the Connection
struct is updated:
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
Next, write_frame()
is implemented.
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_value(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
The functions used here are provided by AsyncWriteExt
. They are available on TcpStream
as well, but it would not be advisable to issue single byte writes without the intermediate buffer.
write_u8
writes a single byte to the writer.write_all
writes the entire slice to the writer.write_decimal
is implemented by mini-redis.
The function ends with a call to self.stream.flush().await
. Because BufWriter
stores writes in an intermediate buffer, calls to write
do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush()
writes any data pending in the buffer to the socket.
Another alternative would be to not call flush()
in write_frame()
. Instead, provide a flush()
function on Connection
. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write
syscall. Doing this complicates the Connection
API. Simplicity is one of Mini-Redis’ goals, so we decided to include the flush().await
call in fn write_frame()
.