Tasks
Tasks are the application’s “unit of logic”. They are similar to Go’sgoroutine and Erlang’s process, but asynchronous. In other words, tasks areasynchronous green threads.
Given that a task runs an asynchronous bit of logic, they are represented by theFuture
trait. The task’s future implementation completes with a ()
valueonce the task is done processing.
Tasks are passed to executors, which handle scheduling the task. An executorusually is scheduling many tasks across a single or small set of threads.Tasks must not perform computation heavy logic or they will prevent othertasks from executing. So don’t try to compute the fibonacci sequence as atask.
Tasks are implemented by either implementing the Future
trait directly or bybuilding up a future using the various combinator functions available in thefutures
and tokio
crates.
Here is an example that fetches the value from a URI using an HTTP get andcaches the result.
The logic is as follows:
- Check the cache to see if there is an entry for the URI.
- If there is no entry, perform the HTTP get.
- Store the response in the cache.
- Return the response.
The entire sequence of events is also wrapped with a timeout in order to preventunbounded execution time.
# #![deny(deprecated)]
# extern crate futures;
# use futures::prelude::*;
# use futures::future::{self, Either, empty};
# use std::time::Duration;
# fn docx() {
#
# pub struct Timeout;
# impl Timeout {
# pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
# empty()
# }
# }
# pub struct MyExecutor;
# impl MyExecutor {
# fn spawn<T>(&self, _: T) {
# unimplemented!();
# }
# }
# pub struct Error;
// The functions here all return `impl Future<...>`. This is one
// of a number of ways to return futures. For more details on
// returning futures, see the "Returning futures" section in
// "Going deeper: Futures".
/// Get a URI from some remote cache.
fn cache_get(uri: &str)
-> impl Future<Item = Option<String>, Error = Error>
# { empty() } /*
{ ... }
# */
fn cache_put(uri: &str, val: String)
-> impl Future<Item = (), Error = Error>
# { empty() } /*
{ ... }
# */
/// Do a full HTTP get to a remote URL
fn http_get(uri: &str)
-> impl Future<Item = String, Error = Error>
# { empty() } /*
{ ... }
# */
#
# let my_executor = MyExecutor;
fn fetch_and_cache(url: &str)
-> impl Future<Item = String, Error = Error>
{
// The URL has to be converted to a string so that it can be
// moved into the closure. Given futures are asynchronous,
// the stack is not around anymore by the time the closure is called.
let url = url.to_string();
let response = http_get(&url)
.and_then(move |response| {
cache_put(&url, response.clone())
.map(|_| response)
});
Box::new(response)
}
let url = "https://example.com";
let response = cache_get(url)
.and_then(|resp| {
// `Either` is a utility provided by the `futures` crate
// that enables returning different futures from a single
// closure without boxing.
match resp {
Some(resp) => Either::A(future::ok(resp)),
None => {
Either::B(fetch_and_cache(url))
}
}
});
// Only let the task run for up to 20 seconds.
//
// This uses a fictional timer API. Use the `tokio-timer` crate for
// all your actual timer needs.
let task = Timeout::new(response, Duration::from_secs(20));
my_executor.spawn(task);
# }
# fn main() {}
Because the steps are all necessary for the task to complete, it makes sense togroup them all within the same task.
However, if instead of updating the cache on a cache-miss, we wanted to updatethe cache value on an interval, then it would make sense to split that intomultiple tasks as the steps are no longer directly related.
# #![deny(deprecated)]
# extern crate futures;
# use futures::prelude::*;
# use futures::future::{self, Either, empty};
# use std::time::Duration;
# fn docx() {
#
# pub struct Timeout;
# impl Timeout {
# pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
# empty()
# }
# }
# pub struct Interval;
# impl Interval {
# pub fn new(_: Duration) -> Box<Stream<Item = (), Error = Error>> {
# unimplemented!();
# }
# }
# pub struct MyExecutor;
# impl MyExecutor {
# fn spawn<T>(&self, _: T) {
# unimplemented!();
# }
# }
# pub struct Error;
#
# fn cache_get(uri: &str)
# -> impl Future<Item = Option<String>, Error = Error>
# { empty() }
# fn cache_put(uri: &str, val: String)
# -> impl Future<Item = (), Error = Error>
# { empty() }
# fn http_get(uri: &str)
# -> impl Future<Item = String, Error = Error>
# { empty() }
# fn fetch_and_cache(url: &str)
# -> impl Future<Item = String, Error = Error>
# { empty() }
# let my_executor = MyExecutor;
let url = "https://example.com";
// An Interval is a stream that yields `()` on a fixed interval.
let update_cache = Interval::new(Duration::from_secs(60))
// On each tick of the interval, update the cache. This is done
// by using the same function from the previous snippet.
.for_each(|_| {
fetch_and_cache(url)
.map(|resp| println!("updated cache with {}", resp))
});
// Spawn the cache update task so that it runs in the background
my_executor.spawn(update_cache);
// Now, only get from the cache.
// (NB: see next section about ensuring the cache is up to date.)
let response = cache_get(url);
let task = Timeout::new(response, Duration::from_secs(20));
my_executor.spawn(task);
# }
# fn main() {}
Message Passing
Just as with Go and Erlang, tasks can communicate using message passing. Infact, it will be very common to use message passing to coordinate multipletasks. This allows independent tasks to still interact.
The futures
crate provides a sync
module which contains some channeltypes that are ideal for message passing across tasks.
oneshot
is a channel for sending exactly one value.mpsc
is a channel for sending many (zero or more) values.
The previous example isn’t exactly correct. Given that tasks are executedconcurrently, there is no guarantee that the cache updating task will havewritten the first value to the cache by the time the other task tries to readfrom the cache.
This is a perfect situation to use message passing. The cache updating task cansend a message notifying the other task that it has primed the cache with aninitial value.
# #![deny(deprecated)]
# extern crate futures;
# use futures::prelude::*;
# use futures::future::{self, Either, empty};
# use futures::sync::oneshot;
# use std::time::Duration;
# fn docx() {
#
# pub struct Timeout;
# impl Timeout {
# pub fn new<T>(_: T, _: Duration) -> impl Future<Item = (), Error = ()> {
# empty()
# }
# }
# pub struct Interval;
# impl Interval {
# pub fn new(_: Duration) -> Box<Stream<Item = (), Error = Error>> {
# unimplemented!();
# }
# }
# pub struct MyExecutor;
# impl MyExecutor {
# fn spawn<T>(&self, _: T) {
# unimplemented!();
# }
# }
# pub struct Error;
#
# fn cache_get(uri: &str)
# -> impl Future<Item = Option<String>, Error = Error>
# { empty() }
# fn cache_put(uri: &str, val: String)
# -> impl Future<Item = (), Error = Error>
# { empty() }
# fn http_get(uri: &str)
# -> impl Future<Item = String, Error = Error>
# { empty() }
# fn fetch_and_cache(url: &str)
# -> impl Future<Item = String, Error = Error>
# { empty() }
# let my_executor = MyExecutor;
let url = "https://example.com";
let (primed_tx, primed_rx) = oneshot::channel();
let update_cache = fetch_and_cache(url)
// Now, notify the other task that the cache is primed
.then(|_| primed_tx.send(()))
// Then we can start refreshing the cache on an interval
.then(|_| {
Interval::new(Duration::from_secs(60))
.for_each(|_| {
fetch_and_cache(url)
.map(|resp| println!("updated cache with {}", resp))
})
});
// Spawn the cache update task so that it runs in the background
my_executor.spawn(update_cache);
// First, wait for the cache to primed
let response = primed_rx
.then(|_| cache_get(url));
let task = Timeout::new(response, Duration::from_secs(20));
my_executor.spawn(task);
# }
# fn main() {}
Task Notification
An application built with Tokio is structured as a set of concurrently runningtasks. Here is the basic structure of a server:
# #![deny(deprecated)]
# extern crate futures;
# extern crate tokio;
#
# use tokio::io;
# use tokio::net::{TcpListener, TcpStream};
# use tokio::prelude::*;
# use futures::future::empty;
#
# pub fn process(socket: TcpStream) -> impl Future<Item = (), Error = ()> + Send {
# empty()
# }
#
# fn docx() {
# let addr = "127.0.0.1:6142".parse().unwrap();
# let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(|socket| {
// Spawn a task to process the connection
tokio::spawn(process(socket));
Ok(())
})
.map_err(|_| ()); // Just drop the error
tokio::run(server);
# }
# pub fn main() {}
In this case, we spawn a task for each inbound server socket. However, it isalso possible to implement a server future that processes all inboundconnections on the same socket:
# #![deny(deprecated)]
# extern crate futures;
# extern crate tokio;
# use futures::prelude::*;
# use tokio::net::*;
# use std::io;
# use futures::future::empty;
pub struct Server {
listener: TcpListener,
connections: Vec<Box<Future<Item = (), Error = io::Error> + Send>>,
}
# pub fn process(socket: TcpStream) -> impl Future<Item = (), Error = io::Error> + Send {
# empty()
# }
impl Future for Server {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Result<Async<()>, io::Error> {
// First, accept all new connections
loop {
match self.listener.poll_accept()? {
Async::Ready((socket, _)) => {
let connection = process(socket);
self.connections.push(Box::new(connection));
}
Async::NotReady => break,
}
}
// Now, poll all connection futures.
let len = self.connections.len();
for i in (0..len).rev() {
match self.connections[i].poll()? {
Async::Ready(_) => {
self.connections.remove(i);
}
Async::NotReady => {}
}
}
// `NotReady` is returned here because the future never actually
// completes. The server runs until it is dropped.
Ok(Async::NotReady)
}
}
# pub fn main() {}
These two strategies are functionally equivalent, but have significantlydifferent runtime characteristics.
Notifications happens at the task level. The task does not know whichsub future triggered the notification. So, whenever the task is polled, it hasto try polling all sub futures.
Layout of a task
In this task, there are three sub futures that can get polled. If a resourcecontained by one of the sub futures transitions to “ready”, the task itself getsnotified and it will try to poll all three of its sub futures. One of them willadvance, which in turn advances the internal state of the task.
The key is to try to keep tasks small, doing as little as possible per task.This is why servers spawn new tasks for each connection instead of processingthe connections in the same task as the listener.
Ok, there actually is a way for the task to know which sub future triggered thenotification using FuturesUnordered
, but usually the right thing to do is tospawn a new task.
Next up: Runtime Model