Getting asynchronous
Futures are all about managing asynchronicity. Implementing a future thatcompletes asynchonously requires correctly handling receiving Async::NotReady
from the inner future.
Let’s start by implementing a future that establishes a TCP socket with a remotepeer and extracts the peer socket address, writing it to STDOUT.
# #![deny(deprecated)]
extern crate tokio;
#[macro_use]
extern crate futures;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use futures::{Future, Async, Poll};
struct GetPeerAddr {
connect: ConnectFuture,
}
impl Future for GetPeerAddr {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
println!("peer address = {}", socket.peer_addr().unwrap());
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
println!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
}
}
}
fn main() {
let addr = "192.168.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let get_peer_addr = GetPeerAddr {
connect: connect_future,
};
# if false {
tokio::run(get_peer_addr);
# }
}
The implementation of GetPeerAddr
is very similar to the Display
future fromthe previous page. The primary difference is, in this case,self.connect.poll()
will (probably) return Async::NotReady
a number of timesbefore returning the connected socket. When this happens, our future returnsNotReady
.
GetPeerAddr
contains ConnectFuture
, a future that completes once a TCPstream has been established. This future is returned by TcpStream::connect
.
When GetPeerAddr
is passed to tokio::run
, Tokio will repeatedly call poll
until Ready
is returned. The exact mechanism by which this happens isdescribed in later chapters.
When implementing Future
, Async::NotReady
must not be returned unlessAsync::NotReady
was obtained when calling poll
on an inner future. One wayto think about it is, when a future is polled, it must do as much work as it canuntil it either completes or becomes blocked on an inner future.
Chaining computations
Now, let’s take the connect future and update it to write “hello world” once theTCP socket has been established.
# #![deny(deprecated)]
extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;
use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};
// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
Connecting(ConnectFuture),
Connected(TcpStream, Cursor<Bytes>),
}
impl Future for HelloWorld {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
use self::HelloWorld::*;
loop {
match self {
Connecting(ref mut f) => {
let socket = try_ready!(f.poll());
let data = Cursor::new(Bytes::from_static(b"hello world"));
*self = Connected(socket, data);
}
Connected(ref mut socket, ref mut data) => {
// Keep trying to write the buffer to the socket as long as the
// buffer has more bytes available for consumption
while data.has_remaining() {
try_ready!(socket.write_buf(data));
}
return Ok(Async::Ready(()));
}
}
}
}
}
fn main() {
let addr = "127.0.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let hello_world = HelloWorld::Connecting(connect_future);
# let hello_world = futures::future::ok::<(), io::Error>(());
// Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}
It is very common to implement futures as an enum
of the possiblestates. This allows the future implementation to track its stateinternally by transitioning between the enum’s variants.
This future is represented as an enumeration of states:
- Connecting
- Writing “hello world” to the socket.
The future starts in the connecting state with an inner future of typeConnectFuture
. It repeatedly polls this future until the socket is returned.The state is then transitioned toConnected
.
From the Connected
state, the future writes data to the socket. This is donewith the write_buf
function. I/O functions are covered in more detail in thenext section. Briefly, write_buf
is a non-blocking function towrite data to the socket. If the socket is not ready to accept the write,NotReady
is returned. If some data (but not necessarily all) was written,Ready(n)
is returned, where n
is the number of written bytes. The cursor isalso advanced.
Once in the Connected
state, the future must loop as long as there is dataleft to write. Because write_buf
is wrapped with try_ready!()
, whenwrite_buf
returns NotReady
, our poll
function returns with NotReady
.
At some point in the future, our poll
function is called again. Because it isin the Connected
state, it jumps directly to writing data.
Note the loops are important. Many future implementations contain loops.These loops are necessary because poll
cannot return until either all the datais written to the socket, or an inner future (ConnectFuture
orwrite_buf
) returns NotReady
.
Next up: Combinators