Implementing futures
Implementing futures is very common when using Tokio. Let’s start with a verybasic future that performs no asynchronous logic and simply returns a message(the venerable “hello world”).
The Future trait.
The Future
trait is as follows:
trait Future {
/// The type of the value returned when the future completes.
type Item;
/// The type representing errors that occurred while processing the computation.
type Error;
/// The function that will be repeatedly called to see if the future
/// has completed or not. The `Async` enum can either be `Ready` or
/// `NotReady` and indicates whether the future is ready to produce
/// a value or not.
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}
Let’s implement it for our “hello world” future:
# #![deny(deprecated)]
extern crate futures;
// `Poll` is a type alias for `Result<Async<T>, E>`
use futures::{Future, Async, Poll};
struct HelloWorld;
impl Future for HelloWorld {
type Item = String;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready("hello world".to_string()))
}
}
The Item
and Error
associated types define the types returned by the futureonce it completes. Item
is the success value and Error
is returned when thefuture encounters an error while processing. By convention, infallible futuresset Error
to ()
.
Futures use a poll based model. The consumer of a future repeatedly calls thepoll
function. The future then attempts to complete. If the future is able tocomplete, it returns Async::Ready(value)
. If the future is unable to completedue to being blocked on an internal resource (such as a TCP socket), it returnsAsync::NotReady
.
When a future’s poll
function is called, the implementation willsynchronously do as much work as possible until it is logicallyblocked on some asynchronous event that has not occured yet. The futureimplementation then saves its state internally so that the next timepoll
is called (after an external event is received), it resumesprocessing from the point it left off. Work is not repeated.
The hello world future requires no asynchronous processing and is immediatelyready, so it returns Ok(Async::Ready(value))
.
Running the future
Tokio is responsible for running futures to completion. This is done by passingthe future to tokio::run
.
The tokio::run
accepts futures where both Item
and Error
are set to ()
.This is because Tokio only executes the futures, it does not do anything withvalues. The user of Tokio is required to fully process all values in the future.
In our case, let’s print the future to STDOUT. We will do that by implementing aDisplay
future.
# #![deny(deprecated)]
extern crate futures;
use futures::{Future, Async, Poll};
use std::fmt;
struct Display<T>(T);
impl<T> Future for Display<T>
where
T: Future,
T::Item: fmt::Display,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<(), T::Error> {
let value = match self.0.poll() {
Ok(Async::Ready(value)) => value,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(err),
};
println!("{}", value);
Ok(Async::Ready(()))
}
}
The Display
takes a future that yields items that can be displayed. When it ispolled, it first tries to poll the inner future. If the inner future is notready then Display
cannot complete. In this case, Display
also returnsNotReady
.
poll
implementations must never return NotReady
unless they receivedNotReady
by calling an inner future. This will be explained in more detailin a later section.
The Display
future will error when the inner future errors. The error isbubbled up.
When HelloWorld
is combined with Display
, both the Item
and Error
typesare ()
and the future can be executed by Tokio:
# #![deny(deprecated)]
extern crate tokio;
# extern crate futures;
# struct HelloWorld;
# struct Display<T>(T);
# impl<T> futures::Future for Display<T> {
# type Item = ();
# type Error = ();
# fn poll(&mut self) -> futures::Poll<(), ()> {
# Ok(().into())
# }
# }
let future = Display(HelloWorld);
tokio::run(future);
Running this results in “hello world” being outputted to standard out.
Cleaning things up
The pattern of waiting on an inner future is common enough that there is ahelper macro: try_ready!
.
The poll function can be rewritten using the macro as such:
# #![deny(deprecated)]
#[macro_use]
extern crate futures;
use futures::{Future, Async, Poll};
use std::fmt;
struct Display<T>(T);
impl<T> Future for Display<T>
where
T: Future,
T::Item: fmt::Display,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<(), T::Error> {
let value = try_ready!(self.0.poll());
println!("{}", value);
Ok(Async::Ready(()))
}
}
# fn main() {}
Next up: Getting asynchronous