Using AsyncRead and AsyncWrite directly
So far, we have primarily talked about AsyncRead
and AsyncWrite
inthe context of I/O combinators provided by Tokio. While these are oftenenough, sometimes you need to implement your own combinators thatwant to perform asynchronous reads and writes directly.
Reading data with AsyncRead
The heart of AsyncRead
is the poll_read
method. It maps theWouldBlock
error that indicates that an I/O read
operation wouldhave blocked into NotReady
, which in turn lets us interoperate withthe world of futures. When you write a Future
(or something like it,such as Stream
) that internally contains an AsyncRead
, poll_read
is likely the method you will be interacting with.
The important thing to keep in mind with poll_read
is that it followsthe same contract as Future::poll
. Specifically, it cannot returnNotReady
unless it has arranged for the current task to be notifiedwhen it can make progress again. This fact is what lets us callpoll_read
inside of poll
in our own Future
s; we know that we areupholding the contract of poll
when we forward a NotReady
frompoll_read
, because poll_read
follows that same contract!
The exact mechanism Tokio uses to ensure that poll_read
later notifiesthe current task is out of scope for this section, but you can read moreabout it in the non-blocking I/O section of Tokio internals if you’reinterested.
With that all said, let’s look at how we might implement theread_exact
method ourselves!
# extern crate tokio;
#[macro_use]
extern crate futures;
# fn main() {}
use std::io;
use tokio::prelude::*;
// This is going to be our Future.
// In the common case, this is set to Some(Reading),
// but we'll set it to None when we return Async::Ready
// so that we can return the reader and the buffer.
struct ReadExact<R, T>(Option<Reading<R, T>>);
struct Reading<R, T> {
// This is the stream we're reading from.
reader: R,
// This is the buffer we're reading into.
buffer: T,
// And this is how far into the buffer we've written.
pos: usize,
}
// We want to be able to construct a ReadExact over anything
// that implements AsyncRead, and any buffer that can be
// thought of as a &mut [u8].
fn read_exact<R, T>(reader: R, buffer: T) -> ReadExact<R, T>
where
R: AsyncRead,
T: AsMut<[u8]>,
{
ReadExact(Some(Reading {
reader,
buffer,
// Initially, we've read no bytes into buffer.
pos: 0,
}))
}
impl<R, T> Future for ReadExact<R, T>
where
R: AsyncRead,
T: AsMut<[u8]>,
{
// When we've filled up the buffer, we want to return both the buffer
// with the data that we read and the reader itself.
type Item = (R, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0 {
Some(Reading {
ref mut reader,
ref mut buffer,
ref mut pos,
}) => {
let buffer = buffer.as_mut();
// Check that we haven't finished
while *pos < buffer.len() {
// Try to read data into the remainder of the buffer.
// Just like read in std::io::Read, poll_read *can* read
// fewer bytes than the length of the buffer it is given,
// and we need to handle that by looking at its return
// value, which is the number of bytes actually read.
//
// Notice that we are using try_ready! here, so if poll_read
// returns NotReady (or an error), we will do the same!
// We uphold the contract that we have arranged to be
// notified later because poll_read follows that same
// contract, and _it_ returned NotReady.
let n = try_ready!(reader.poll_read(&mut buffer[*pos..]));
*pos += n;
// If no bytes were read, but there was no error, this
// generally implies that the reader will provide no more
// data (for example, because the TCP connection was closed
// by the other side).
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
}
}
}
None => panic!("poll a ReadExact after it's done"),
}
// We need to return the reader and the buffer, which we can only
// do by moving them out of self. We do this by taking our state
// and leaving `None`. This _should_ be fine, because poll()
// requires callers to not call poll() again after Ready has been
// returned, so we should only ever see Some(Reading) when poll()
// is called.
let reading = self.0.take().expect("must have seen Some above");
Ok(Async::Ready((reading.reader, reading.buffer)))
}
}
Writing data with AsyncWrite
Just like pollread
is the core piece of AsyncRead
, poll_write
isthe core of AsyncWrite
. Like poll_read
, it maps the WouldBlock
error that indicates that an I/O write
_ operation would have blocked intoNotReady
, which again lets us interoperate with the world offutures. AsyncWrite
also has a poll_flush
, which provides anasynchronous analogue to Write
’s flush
method. The role ofpoll_flush
is to make sure that any bytes previously written bypoll_write
are, well, flushed onto the underlying I/O resource(written out in network packets for example). Similar to poll_write
, itwraps around Write::flush
, and maps a WouldBlock
error intoNotReady
to indicate that the flushing is still ongoing.
AsyncWrite
’s poll_write
and poll_flush
follow the same contract asFuture::poll
and AsyncRead::poll_read
, namely that if they returnNotReady
, they have arranged for the current task to be notified whenthey can make progress again. Like with poll_read
, this means that wecan safely call these methods in our own futures, and know that we arealso following the contract.
Tokio uses the same mechanism to manage notifications for poll_write
and poll_flush
as it does for poll_read
, and you can read more aboutit in the non-blocking I/O section of Tokio internals.
Shutdown
AsyncWrite
also adds one method that is not part of Write
:shutdown
. From its documentation:
Initiates or attempts to shut down this writer, returning success whenthe I/O connection has completely shut down.This method is intended to be used for asynchronous shutdown of I/Oconnections. For example this is suitable for implementing shutdown ofa TLS connection or callingTcpStream::shutdown
on a proxiedconnection. Protocols sometimes need to flush out final pieces of dataor otherwise perform a graceful shutdown handshake, reading/writingmore data as appropriate. This method is the hook for such protocolsto implement the graceful shutdown logic.
This sums shutdown
up pretty nicely: it is a way to tell the writerthat no more data is coming, and that it should indicate in whatever waythe underlying I/O protocol requires. For TCP connections, for example,this usually entails closing the writing side of the TCP channel so thatthe other end receives and end-of-file in the form of a read thatreturns 0 bytes. You can often think of shutdown
as what you would_have done synchronously in the implementation of Drop
; it’s just thatin the asynchronous world, you can’t easily do something in Drop
because you need to have an executor that keeps polling your writer!
Note that calling shutdown
on a write “half” of a type that implementsAsyncWrite
_and AsyncRead
does not shut down the read “half”. Youcan usually still continue reading data as you please until the otherside shuts down their corresponding write “half”.
An example of using AsyncWrite
Without further ado, let’s take a look at how we might implement
# extern crate tokio;
#[macro_use]
extern crate futures;
# fn main() {}
use std::io;
use tokio::prelude::*;
// This is going to be our Future.
// It'll seem awfully familiar to ReadExact above!
// In the common case, this is set to Some(Writing),
// but we'll set it to None when we return Async::Ready
// so that we can return the writer and the buffer.
struct WriteAll<W, T>(Option<Writing<W, T>>);
struct Writing<W, T> {
// This is the stream we're writing into.
writer: W,
// This is the buffer we're writing from.
buffer: T,
// And this is much of the buffer we've written.
pos: usize,
}
// We want to be able to construct a WriteAll over anything
// that implements AsyncWrite, and any buffer that can be
// thought of as a &[u8].
fn write_all<W, T>(writer: W, buffer: T) -> WriteAll<W, T>
where
W: AsyncWrite,
T: AsRef<[u8]>,
{
WriteAll(Some(Writing {
writer,
buffer,
// Initially, we've written none of the bytes from buffer.
pos: 0,
}))
}
impl<W, T> Future for WriteAll<W, T>
where
W: AsyncWrite,
T: AsRef<[u8]>,
{
// When we've written out the entire buffer, we want to return
// both the buffer and the writer so that the user can re-use them.
type Item = (W, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0 {
Some(Writing {
ref mut writer,
ref buffer,
ref mut pos,
}) => {
let buffer = buffer.as_ref();
// Check that we haven't finished
while *pos < buffer.len() {
// Try to write the remainder of the buffer into the writer.
// Just like write in std::io::Write, poll_write *can* write
// fewer bytes than the length of the buffer it is given,
// and we need to handle that by looking at its return
// value, which is the number of bytes actually written.
//
// We are using try_ready! here, just like in poll_read in
// ReadExact, so that if poll_write returns NotReady (or an
// error), we will do the same! We uphold the contract that
// we have arranged to be notified later because poll_write
// follows that same contract, and _it_ returned NotReady.
let n = try_ready!(writer.poll_write(&buffer[*pos..]));
*pos += n;
// If no bytes were written, but there was no error, this
// generally implies that something weird happened under us.
// We make sure to turn this into an error for the caller to
// deal with.
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"zero-length write",
));
}
}
}
None => panic!("poll a WriteAll after it's done"),
}
// We use the same trick as in ReadExact to ensure that we can return
// the buffer and the writer once the entire buffer has been written out.
let writing = self.0.take().expect("must have seen Some above");
Ok(Async::Ready((writing.writer, writing.buffer)))
}
}
Next up: Implementing Async Read/Write