Streams
Streams are similar to futures, but instead of yielding a single value, theyasynchronously yield one or more values. They can be thought of as asynchronousiterators.
Just like futures, streams are able to represent a wide range of things as longas those things produce discrete values at different points sometime in thefuture. For instance:
- UI Events caused by the user interacting with a GUI in different ways. When anevent happens the stream yields a different message to your app over time.
- Push Notifications from a server. Sometimes a request/response model is notwhat you need. A client can establish a notification stream with a server to beable to receive messages from the server without specifically being requested.
- Incoming socket connections. As different clients connect to a server, theconnections stream will yield socket connections.
The Stream trait
Just like Future
, implementing Stream
is common when using Tokio. TheStream
trait is as follows:
trait Stream {
/// The type of the value yielded by the stream.
type Item;
/// The type representing errors that occurred while processing the computation.
type Error;
/// The function that will be repeatedly called to see if the stream has
/// another value it can yield
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
The Item
associated type is the type yielded by the stream. The Error
associated type is the type of the error yielded when something unexpectedhappens. The poll
function is very similar to Future
’s poll
function. Theonly difference is that, this time, Option<Self::Item>
is returned.
Stream implementations have the poll
function called many times. When the nextvalue is ready, Ok(Async::Ready(Some(value)))
is returned. When the stream isnot ready to yield a value, Ok(Async::NotReady)
is returned. When thestream is exhausted and will yield no further values, Ok(Async::Ready(None))
is returned. Just like with futures, streams must not returnAsync::NotReady
unless Async::NotReady
was obtained by an inner stream orfuture.
When the stream encounters an error, Err(error)
is returned. Returning anerror does not signify that the stream is exhausted. The error may betransient and the caller may try calling poll
again in the future and valuesmay be produced again. If the error is fatal, then the next call to poll
should return Ok(Async::Ready(None))
.
Fibonacci
The following example shows how to implement the fibonacci sequence as a stream.
extern crate futures;
use futures::{Stream, Poll, Async};
pub struct Fibonacci {
curr: u64,
next: u64,
}
impl Fibonacci {
fn new() -> Fibonacci {
Fibonacci {
curr: 1,
next: 1,
}
}
}
impl Stream for Fibonacci {
type Item = u64;
// The stream will never yield an error
type Error = ();
fn poll(&mut self) -> Poll<Option<u64>, ()> {
let curr = self.curr;
let next = curr + self.next;
self.curr = self.next;
self.next = next;
Ok(Async::Ready(Some(curr)))
}
}
To use the stream, a future must be built that consumes it. The following futurewill take a stream and display 10 items from it.
#[macro_use]
extern crate futures;
use futures::{Future, Stream, Poll, Async};
use std::fmt;
pub struct Display10<T> {
stream: T,
curr: usize,
}
impl<T> Display10<T> {
fn new(stream: T) -> Display10<T> {
Display10 {
stream,
curr: 0,
}
}
}
impl<T> Future for Display10<T>
where
T: Stream,
T::Item: fmt::Display,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<(), Self::Error> {
while self.curr < 10 {
let value = match try_ready!(self.stream.poll()) {
Some(value) => value,
// There were less than 10 values to display, terminate the
// future.
None => break,
};
println!("value #{} = {}", self.curr, value);
self.curr += 1;
}
Ok(Async::Ready(()))
}
}
# fn main() {}
Now, the fibonacci sequence can be displayed:
extern crate tokio;
# extern crate futures;
# struct Fibonacci;
# impl Fibonacci { fn new() { } }
# struct Display10<T> { v: T };
# impl<T> Display10<T> {
# fn new(_: T) -> futures::future::FutureResult<(), ()> {
# futures::future::ok(())
# }
# }
let fib = Fibonacci::new();
let display = Display10::new(fib);
tokio::run(display);
Getting asynchronous
So far, the fibonacci stream is synchronous. Lets make it asynchronous bywaiting a second between values. To do this,tokio::Interval
is used. Interval
is, itself, a streamthat yields ()
values at the requested time interval. Calling Interval::poll
between intervals results in Async::NotReady
being returned.
The Fibonacci
stream is updated as such:
#[macro_use]
extern crate futures;
extern crate tokio;
use tokio::timer::Interval;
use futures::{Stream, Poll, Async};
use std::time::Duration;
pub struct Fibonacci {
interval: Interval,
curr: u64,
next: u64,
}
impl Fibonacci {
fn new(duration: Duration) -> Fibonacci {
Fibonacci {
interval: Interval::new_interval(duration),
curr: 1,
next: 1,
}
}
}
impl Stream for Fibonacci {
type Item = u64;
// The stream will never yield an error
type Error = ();
fn poll(&mut self) -> Poll<Option<u64>, ()> {
// Wait until the next interval
try_ready!(
self.interval.poll()
// The interval can fail if the Tokio runtime is unavailable.
// In this example, the error is ignored.
.map_err(|_| ())
);
let curr = self.curr;
let next = curr + self.next;
self.curr = self.next;
self.next = next;
Ok(Async::Ready(Some(curr)))
}
}
# fn main() {}
The Display10
future already supports asynchronicity so it does not need to beupdated.
To run the throttled fibonacci sequence, include an interval:
extern crate tokio;
# extern crate futures;
# struct Fibonacci;
# impl Fibonacci { fn new(dur: Duration) { } }
# struct Display10<T> { v: T };
# impl<T> Display10<T> {
# fn new(_: T) -> futures::future::FutureResult<(), ()> {
# futures::future::ok(())
# }
# }
use std::time::Duration;
let fib = Fibonacci::new(Duration::from_secs(1));
let display = Display10::new(fib);
tokio::run(display);
Combinators
Just like futures, streams come with a number of combinators for reducingboilerplate. Many of these combinators exist as functions on theStream
trait.
Updating fibonacci stream can be rewritten using the unfold
function:
extern crate futures;
use futures::{stream, Stream};
fn fibonacci() -> impl Stream<Item = u64, Error = ()> {
stream::unfold((1, 1), |(curr, next)| {
let new_next = curr + next;
Some(Ok((curr, (next, new_next))))
})
}
Just like with futures, using stream combinators requires a functional style ofprogramming. Also, impl Stream
is used to return the stream from the function.The returning futures strategies apply equally to returning streams.
Display10
is reimplemented using take
and for_each
:
extern crate tokio;
extern crate futures;
use futures::Stream;
# use futures::stream;
# fn fibonacci() -> impl Stream<Item = u64, Error = ()> {
# stream::once(Ok(1))
# }
tokio::run(
fibonacci().take(10)
.for_each(|num| {
println!("{}", num);
Ok(())
})
);
The take
combinator limits the fibonacci stream to 10 values. The for_each
combinator asynchronously iterates the stream values. for_each
consumes thestream and returns a future that completes once the closure was called once foreach stream value. It is the asynchronous equivalent to a rust for
loop.
Essential combinators
It is worth spending some time with the Stream
trait andmodule documentation to gain some familiarity with the full set ofavailable combinators. This guide will provide a very quick overview.
Concrete streams
The stream
module contains functions for converting values anditerators into streams.
once
converts the provided value into an immediately ready stream thatyields a single item: the provided value.iter_ok
anditer_result
both takeIntoIterator
values and convertsthem to an immediately ready stream that yields the iterator values.empty
returns a stream that immediately yieldsNone
.
For example:
extern crate tokio;
extern crate futures;
use futures::{stream, Stream};
let values = vec!["one", "two", "three"];
tokio::run(
stream::iter_ok(values).for_each(|value| {
println!("{}", value);
Ok(())
})
)
Adapters
Like Iterator
, the Stream
trait includes a broad range of “adapter”methods. These methods all consume the stream, returning a new stream providingthe requested behavior. Using these adapter combinators, it is possible to: