Runtime Model
Now we will go over the Tokio / futures runtime model. Tokio is built on top ofthe futures
crate and uses its runtime model. This allows it to interopwith other libraries also using the futures
crate.
Note: This runtime model is very different than async libraries found inother languages. While, at a high level, APIs can look similar, the way codegets executed differs.
Synchronous Model
First, let’s talk briefly about the synchronous (or blocking) model. This is themodel that the Rust standard library uses.
# use std::io::prelude::*;
# use std::net::TcpStream;
# fn dox(mut socket: TcpStream) {
// let socket = ...;
let mut buf = [0; 1024];
let n = socket.read(&mut buf).unwrap();
// Do something with &buf[..n];
# }
When socket.read
is called, either the socket has pending data in its receivebuffer or it does not. If there is pending data, then the call to read
willreturn immediately and buf
will be filled with that data. However, if there isno pending data, then the read
function will block the current thread untildata is received. At which time, buf
will be filled with this newly receiveddata and the read
function will return.
In order to perform reads on many different sockets concurrently, a thread persocket is required. Using a thread per socket does not scale up very well tolarge numbers of sockets. This is known as the c10k problem.
Non-blocking sockets
The way to avoid blocking a thread when performing an operation like read is tonot block the thread! When the socket has no pending data in its receive buffer,the read
function returns immediately, indicating that the socket was “notready” to perform the read operation.
When using a Tokio TcpStream
, a call to read
will always immediately returna value (ErrorKind::WouldBlock
) even if there is no pending data to read.If there is no pending data, the caller is responsible for calling read
againat a later time. The trick is to know when that “later time” is.
Another way to think about a non-blocking read is as “polling” the socket fordata to read.
Polling Model
The strategy of polling a socket for data can be generalized to any operation.For example, a function to get a “widget” in the polling model would looksomething like this:
fn poll_widget() -> Async<Widget> { ... }
This function returns an Async<Widget>
where Async
is an enum ofReady(Widget)
or NotReady
. The Async
enum is provided by the futures
crate and is one of the building blocks of the polling model.
Now, lets define an asynchronous task without combinators that uses thispoll_widget
function. The task will do the following:
- Acquire a widget.
- Print the widget to STDOUT.
- Terminate the task.
To define a task, we implement theFuture
trait.
# #![deny(deprecated)]
# extern crate futures;
# use futures::{Async, Future};
#
# #[derive(Debug)]
# pub struct Widget;
# fn poll_widget() -> Async<Widget> { unimplemented!() }
#
/// A task that polls a single widget and writes it to STDOUT.
pub struct MyTask;
impl Future for MyTask {
// The value this future will have when ready
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<()>, ()> {
match poll_widget() {
Async::Ready(widget) => {
println!("widget={:?}", widget);
Ok(Async::Ready(()))
}
Async::NotReady => {
Ok(Async::NotReady)
}
}
}
}
#
# fn main() {
# }
Important: ReturningAsync::NotReady
has special meaning. See the nextsection for more details.
The key thing to note is, when MyTask::poll
is called, it immediately tries toget the widget. If the call to poll_widget
returns NotReady
, then the taskis unable to make further progress. The task then returns NotReady
itself,indicating that it is not ready to complete processing.
The task implementation does not block. Instead, “sometime in the future”, theexecutor will call MyTask::poll
again. poll_widget
will be called again. Ifpoll_widget
is ready to return a widget, then the task, in turn, is ready toprint the widget. The task can then complete by returning Ready
.
Executors
In order for the task to make progress, something has to call MyTask::poll
.This is the job of an executor.
Executors are responsible for repeatedly calling poll
on a task until Ready
is returned. There are many different ways to do this. For example, theCurrentThread
executor will block the current thread and loop through allspawned tasks, calling poll on them. ThreadPool
schedules tasks across a threadpool. This is also the default executor used by the runtime.
All tasks must be spawned on an executor or no work will be performed.
At the very simplest, an executor could look something like this:
# #![deny(deprecated)]
# extern crate futures;
# use futures::{Async, Future};
# use std::collections::VecDeque;
#
pub struct SpinExecutor {
// the tasks an executor is responsible for in
// a double ended queue
tasks: VecDeque<Box<Future<Item = (), Error = ()> + Send>>,
}
impl SpinExecutor {
pub fn spawn<T>(&mut self, task: T)
where T: Future<Item = (), Error = ()> + 'static + Send
{
self.tasks.push_back(Box::new(task));
}
pub fn run(&mut self) {
// Pop tasks off the front in a tight loop
while let Some(mut task) = self.tasks.pop_front() {
match task.poll().unwrap() {
Async::Ready(_) => {}
Async::NotReady => {
// If the task is not ready put it to the back of queue
self.tasks.push_back(task);
}
}
}
}
}
# pub fn main() {}
Of course, this would not be very efficient. The executor spins in a busy loopand tries to poll all tasks even if the task will just return NotReady
again.
Ideally, there would be some way for the executor to know when the “readiness”state of a task is changed, i.e. when a call to poll
will return Ready
.Then, the executor would look something like this:
# #![deny(deprecated)]
# extern crate futures;
# use futures::{Async, Future};
# use std::collections::VecDeque;
#
# pub struct SpinExecutor {
# ready_tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
# not_ready_tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
# }
#
# impl SpinExecutor {
# fn sleep_until_tasks_are_ready(&self) {}
#
pub fn run(&mut self) {
loop {
while let Some(mut task) = self.ready_tasks.pop_front() {
match task.poll().unwrap() {
Async::Ready(_) => {}
Async::NotReady => {
self.not_ready_tasks.push_back(task);
}
}
}
if self.not_ready_tasks.is_empty() {
return;
}
// Put the thread to sleep until there is work to do
self.sleep_until_tasks_are_ready();
}
}
# }
# pub fn main() {}
Being able to get notified when a task goes from “not ready” to “ready” is thecore of the futures
task model.
Next up: I/O with Tokio