Combinators
Often times, Future implementations follow similar patterns. To help reduceboilerplate, the futures
crate provides a number of utilities, called“combinators”, that abstract these patterns. Many of these combinators exist asfunctions on the Future
trait.
Building blocks
Let’s revisit the future implementations from the previous pages and see howthey can be simplified by using combinators.
map
The map
combinator takes a future and returns a new future that applies afunction to the value yielded by the first future.
This was the Display
future previously
implemented:
# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
# extern crate tokio;
#
# use futures::{Future, Async, Poll};
# use std::fmt;
#
# struct Display<T>(T);
#
impl<T> Future for Display<T>
where
T: Future,
T::Item: fmt::Display,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<(), T::Error> {
let value = try_ready!(self.0.poll());
println!("{}", value);
Ok(Async::Ready(()))
}
}
fn main() {
# let HelloWorld = futures::future::ok::<_, ()>("hello");
let future = Display(HelloWorld);
tokio::run(future);
}
With the map
combinator, it becomes:
# #![deny(deprecated)]
extern crate tokio;
extern crate futures;
use futures::Future;
fn main() {
# let HelloWorld = futures::future::ok::<_, ()>("hello");
let future = HelloWorld.map(|value| {
println!("{}", value);
});
tokio::run(future);
}
This is how map
is implemented:
# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
# use futures::{Future, Async, Poll};
pub struct Map<A, F> where A: Future {
future: A,
f: Option<F>,
}
impl<U, A, F> Future for Map<A, F>
where A: Future,
F: FnOnce(A::Item) -> U,
{
type Item = U;
type Error = A::Error;
fn poll(&mut self) -> Poll<U, A::Error> {
let value = try_ready!(self.future.poll());
let f = self.f.take().expect("cannot poll Map twice");
Ok(Async::Ready(f(value)))
}
}
# fn main() {}
Comparing Map
with our Display
implementation, it is clear how they both arevery similar. Where Display
calls println!
, Map
passes the value to thefunction.
and_then
Now, let’s use combinators to rewrite the future that established a TCP streamand wrote “hello world” to the peer using the and_then
combinator.
The and_then
combinator allows sequencing two asynchronous operations. Oncethe first operation completes, the value is passed to a function. The functionuses that value to produce a new future and that future is then executed. Thedifference between and_then
and map
is that and_then
’s function returns afuture where as map
’s function returns a value.
The original implementation is found here. Once updated touse combinators, it becomes:
# #![deny(deprecated)]
extern crate tokio;
extern crate bytes;
extern crate futures;
use tokio::io;
use tokio::net::TcpStream;
use futures::Future;
fn main() {
let addr = "127.0.0.1:1234".parse().unwrap();
let future = TcpStream::connect(&addr)
.and_then(|socket| {
io::write_all(socket, b"hello world")
})
.map(|_| println!("write complete"))
.map_err(|_| println!("failed"));
# let future = futures::future::ok::<(), ()>(());
tokio::run(future);
}
Further computations may be sequenced by chaining calls to and_then
. Forexample:
# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# extern crate futures;
#
# use tokio::io;
# use tokio::net::TcpStream;
# use futures::Future;
fn main() {
let addr = "127.0.0.1:1234".parse().unwrap();
let future = TcpStream::connect(&addr)
.and_then(|socket| {
io::write_all(socket, b"hello world")
})
.and_then(|(socket, _)| {
// read exactly 11 bytes
io::read_exact(socket, vec![0; 11])
})
.and_then(|(socket, buf)| {
println!("got {:?}", buf);
Ok(())
});
# let future = futures::future::ok::<(), ()>(());
tokio::run(future);
}
The future returned by and_then
executes identically to the future weimplemented by hand on the previous page.
Essential combinators
It is worth spending some time with the Future
trait andmodule documentation to gain familiarity with the full set ofavailable combinators. This guide will provide a very quick overview.
Concrete futures
Any value can be turned into an immediately complete future. There are a fewfunctions in the future
module for creating such a future:
ok
, analogous toResult::Ok
, converts the provided value into aimmediately ready future that yields back the value.err
, analogous toResult::Err
, converts the provided error into animmediately ready future that fails with the error. as an immediately failedfuture.result
lifts a result to an immediately complete future.
In addition, there is also a function,lazy
, which allows constructing afuture given a closure. The closure is not immediately invoked, instead it isinvoked the first time the future is polled.
IntoFuture
A crucial API to know about is the IntoFuture
trait, which is a trait forvalues that can be converted into futures. Most APIs that you think of as takingfutures actually work with this trait instead. The key reason: the trait isimplemented for Result
, allowing you to return Result
values in many placesthat futures are expected.
Most combinator closures that return a future actually return an instance ofIntoFuture
.
Adapters
Like Iterator
, the Future
trait includes a broad range of “adapter”methods. These methods all consume the future, returning a new future providingthe requested behavior. Using these adapter combinators, it is possible to:
- Change the type of a future (
map
,map_err
) - Run another future after one has completed (
then
,and_then
,or_else
) - Figure out which of two futures resolves first (
select
) - Wait for two futures to both complete (
join
) - Convert to a trait object (
Box::new
) - Convert unwinding into errors (
catch_unwind
)
When to use combinators
Using combinators can reduce a lot of boilerplate, but they are not always agood fit. Due to limitations, implementing Future
manually is going to be common.
Functional style
Closures passed to combinators must be 'static
. This means it is not possibleto pass references into the closure. Ownership of all state must be moved intothe closure. The reason for this is that lifetimes are based on the stack. Withasynchronous code, the ability to rely on the stack is lost.
Because of this, code written using combinators end up being very functional instyle. Let’s compare Future combinators with synchronous Result
combinators.
use std::io;
# struct Data;
fn get_data() -> Result<Data, io::Error> {
# unimplemented!();
// ...
}
fn get_ok_data() -> Result<Vec<Data>, io::Error> {
let mut dst = vec![];
for _ in 0..10 {
get_data().and_then(|data| {
dst.push(data);
Ok(())
});
}
Ok(dst)
}
This works because the closure passed to and_then
is able to obtain a mutableborrow to dst
. The Rust compiler is able to guarantee that dst
will outlivethe closure.
However, when using futures, it is no longer possible to borrow dst
. Instead,dst
must be passed around. Something like:
extern crate futures;
use futures::{stream, Future, Stream};
use std::io;
# struct Data;
fn get_data() -> impl Future<Item = Data, Error = io::Error> {
# futures::future::ok(Data)
// ...
}
fn get_ok_data() -> impl Future<Item = Vec<Data>, Error = io::Error> {
let mut dst = vec![];
// Start with an unbounded stream that uses unit values.
stream::repeat(())
// Only take 10. This is how the for loop is simulated using a functional
// style.
.take(10)
// The `fold` combinator is used here because, in order to be
// functional, the state must be moved into the combinator. In this
// case, the state is the `dst` vector.
.fold(dst, move |mut dst, _| {
// Once again, the `dst` vector must be moved into the nested
// closure.
get_data().and_then(move |item| {
dst.push(item);
// The state must be included as part of the return value, so
// `dst` is returned.
Ok(dst)
})
})
}
# fn main() {}
Another strategy, which tends to work best with immutable data, is to store thedata in an Arc
and clone handles into the closures. One case in which thisworks well is sharing configuration values in multiple closures. For example:
extern crate futures;
use futures::{future, Future};
use std::io;
use std::sync::Arc;
fn get_message() -> impl Future<Item = String, Error = io::Error> {
// ....
# futures::future::ok("".to_string())
}
fn print_multi() -> impl Future<Item = (), Error = io::Error> {
let name = Arc::new("carl".to_string());
let futures: Vec<_> = (0..10).map(|_| {
// Clone the `name` handle, this allows multiple concurrent futures
// to access the name to print.
let name = name.clone();
get_message()
.and_then(move |message| {
println!("Hello {}, {}", name, message);
Ok(())
})
})
.collect();
future::join_all(futures)
.map(|_| ())
}
Returning futures
Because combinators often use closures as part of their type signature, it isnot possible to name the future type. This, in turn, means that the future typecannot be used as part of a function’s signature. When passing a future as afunction argument, generics can be used in almost all cases. For example:
extern crate futures;
use futures::Future;
fn get_message() -> impl Future<Item = String> {
// ...
# futures::future::ok::<_, ()>("".to_string())
}
fn with_future<T: Future<Item = String>>(f: T) {
// ...
# drop(f);
}
let my_future = get_message().map(|message| {
format!("MESSAGE = {}", message)
});
with_future(my_future);
However, for returning futures, it isn’t as simple. There are a few options withpros and cons:
Use impl Future
As of Rust version 1.26, the language feature impl Trait
can be used forreturning combinator futures. This allows writing the following:
# extern crate futures;
# use futures::Future;
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
where F: Future<Item = i32>,
{
f.map(|i| i + 10)
}
The add_10
function has a return type that is “something that implementsFuture
” with the specified associated types. This allows returning a futurewithout explicitly naming the future type.
The pros to this approach are that it is zero overhead and covers a wide varietyof cases. However, there is a problem when returning futures from differentcode branches. For example:
if some_condition {
return get_message()
.map(|message| format!("MESSAGE = {}", message));
} else {
return futures::ok("My MESSAGE".to_string());
}
Returning from multiple branches
This results in rustc
outputting a compilation error of error[E0308]: if and
. Functions returning
else have incompatible typesimpl Future
must still havea single return type. The impl Future
syntax just means that the return typedoes not have to be named. However, each combinator type has a differenttype, so the types being returned in each conditional branch are different.
Given the above scenario, there are two options. The first is to change thefunction to return a trait object. The second is to use theEither
type:
# extern crate futures;
# use futures::Future;
# use futures::future::{self, Either};
# fn get_message() -> impl Future<Item = String> {
# future::ok::<_, ()>("".to_string())
# }
# fn my_op() -> impl Future<Item = String> {
# let some_condition = true;
if some_condition {
return Either::A(get_message()
.map(|message| format!("MESSAGE = {}", message)));
} else {
return Either::B(
future::ok("My MESSAGE".to_string()));
}
# }
# fn main() {}
This ensures that the function has a single return type: Either
.
In situations where there are more than two branches, Either
enums must benested (Either<Either<A, B>, C>
) or a custom, multi variant, enum is defined.
This scenario comes up often when trying to conditional return errors.Consider:
# extern crate futures;
# use futures::{future::{self, Either}, Future};
# fn is_valid(_: &str) -> bool { true }
# fn get_message() -> impl Future<Item = String, Error = &'static str> { future::ok("".to_string()) }
fn my_operation(arg: String) -> impl Future<Item = String> {
if is_valid(&arg) {
return Either::A(get_message().map(|message| {
format!("MESSAGE = {}", message)
}));
}
Either::B(future::err("something went wrong"))
}
# fn main() {}
In order to return early when an error has been encountered, an Either
variantmust be used to contain the error future.
Associated types
Traits with functions that return futures must include an associated type forthat future. For example, consider a simplified version of the Tower Service
trait:
pub trait Service {
/// Requests handled by the service.
type Request;
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The future response value.
type Future: Future<Item = Self::Response, Error = Self::Error>;
fn call(&mut self, req: Self::Request) -> Self::Future;
}
In order to implement this trait, the future returned by call
must benameable and set to the Future
associated type. In this case, impl Future
does not work and the future must either be boxed as a traitobject or a custom future must be defined.
Trait objects
Another strategy is to return a boxed future as a trait object:
# extern crate futures;
# use std::io;
# use futures::Future;
# fn main() {}
fn foo() -> Box<Future<Item = u32, Error = io::Error> + Send> {
// ...
# loop {}
}
The pro of this strategy is that it is easy to write Box
. It also is able tohandle the “branching” described above with arbitrary number of branches:
# extern crate futures;
# use futures::{future::{self, Either}, Future};
# fn is_valid(_: &str) -> bool { true }
# fn get_message() -> impl Future<Item = String, Error = &'static str> { future::ok("".to_string()) }
fn my_operation(arg: String) -> Box<Future<Item = String, Error = &'static str> + Send> {
if is_valid(&arg) {
if arg == "foo" {
return Box::new(get_message().map(|message| {
format!("FOO = {}", message)
}));
} else {
return Box::new(get_message().map(|message| {
format!("MESSAGE = {}", message)
}));
}
}
Box::new(future::err("something went wrong"))
}
# fn main() {}
The downside is that the boxing approach requires more overhead. An allocationis required to store the returned future value. In addition, whenever the futureis used Rust needs to dynamically unbox it via a runtime lookup (vtable).This can make boxed futures slightly slower in practice, though the differenceis often not noticeable.
There is one caveat that can trip up authors trying to use a Box<Future<…>>
,particularly with tokio::run
. By default, Box<Future<…>>
is not Send
and cannot be sent across threads, even if the future contained in the box isSend
.
To make a boxed future Send
, it must be annotated as such:
fn my_operation() -> Box<Future<Item = String, Error = &'static str> + Send> {
// ...
}
Implement Future by hand
Finally, when the above strategies fail, it is always possible to fall back onimplementing Future
by hand. Doing so provides full control, but comes at acost of additional boilerplate given that no combinator functions can be usedwith this approach.
When to use combinators
Combinators are powerful ways to reduce boilerplate in your Tokio basedapplication, but as discussed in this section, they are not a silver bullet. Itis common to implement custom futures as well as custom combinators. This raisesthe question of when combinators should be used versus implementing Future
byhand.
As per the discussion above, if the future type must be nameable and a Box
isnot acceptable overhead, then combinators may not be used. Besides this, itdepends on the complexity of the state that must be passed around betweencombinators.
Scenarios when the state must be accessed concurrently from multiple combinatorsmay be a good case for implementing a Future
by hand.
TODO: This section needs to be expanded with examples. If you have ideas toimprove this section, visit the doc-push repo and open an issue with yourthoughts.
Next up: Streams