运行时模型
现在我们将介绍Tokio /future
运行时模型。 Tokio构建在future
箱顶部并使用其运行时模型。 这允许它也使用future
箱与其他图书馆互操作。
注意:此运行时模型与其他语言中的异步库非常不同。 虽然在较高的层面上,API看起来很相似,但代码执行方式却有所不同。
同步模型
首先,让我们简要谈谈同步(或阻塞)模型。 这是Rust标准库使用的模型。
// let socket = ...;
let mut buf = [0; 1024];
let n = socket.read(&mut buf).unwrap();
// Do something with &buf[..n];
调用socket.read时,无论套接字在其缓冲区中是否有待处理数据, 如果有待处理的数据,则读取的调用将立即返回,并且buf将填充该数据。 如果没有未决数据,则read函数将阻止当前线程,直到收到数据。 此时,buf将填充此新接收的数据,并且将返回read
函数
为了同时在许多不同的套接字上并发执行读取,每个套接字需要一个线程。 每个套接字使用一个线程不能很好地扩展到大量的套接字。 这被称为c10k问题。
非阻塞套接字
在执行像read这样的操作时避免阻塞线程的方法是不阻塞线程! 当套接字在其接收缓冲区中没有未决数据时,read函数立即返回,表明套接字“未准备好”以执行读取操作。
使用Tokio TcpStream时,如果没有要读取的待处理数据,则对read的调用将返回类型ErrorKind :: WouldBlock的错误。 此时,调用者负责稍后再次调用read。 诀窍是知道“晚些时候”的时间。
考虑非阻塞读取的另一种方法是“轮询”套接字以读取数据。
轮询模型
轮询套接字数据的策略可以推广到任何操作。 例如,在轮询模型中获取“小部件”的函数看起来像这样:
fn poll_widget() -> Async<Widget> { ... }
此函数返回Async <Widget>
,其中Async是Ready(Widget)或NotReady的枚举。 Async枚举由future
箱提供,是轮询模型的构建块之一。
现在,让我们定义一个没有使用此poll_widget函数的组合器的异步任务。 该任务将执行以下操作:
- 获取小部件。
- 将小部件打印到STDOUT。
- 终止任务。
为了定义任务,我们实现了Future trait。
///轮询单个小部件并将其写入STDOUT的任务。
pub struct MyTask;
impl Future for MyTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<()>, ()> {
match poll_widget() {
Async::Ready(widget) => {
println!("widget={:?}", widget);
Ok(Async::Ready(()))
}
Async::NotReady => {
return Ok(Async::NotReady);
}
}
}
}
重要提示: 返回Async :: NotReady具有特殊含义。 有关详细信息,请参阅下一节。
需要注意的关键是,当调用MyTask :: poll时,它会立即尝试获取小部件。 如果对poll_widget的调用返回NotReady,则该任务无法继续进行。 然后任务返回NotReady,表明它尚未准备好完成处理。
任务实现不会阻止。 相反,“将来的某个时间”,执行者将再次调用MyTask :: poll。 将再次调用poll_widget。 如果poll_widget已准备好返回窗口小部件,则该任务又可以打印窗口小部件。 然后,可以通过返回Ready来完成任务。
执行者(Executors)
为了使任务取得进展,必须调用MyTask :: poll。 这就是执行者的工作。
执行程序负责反复调用任务轮询,直到返回Ready。 有很多不同的方法可以做到这一点。 例如,CurrentThread执行者将阻止当前线程并遍历所有生成的任务,并对它们调用poll。 ThreadPool在线程池中调度任务。 这也是运行时使用的默认执行者。
必须在执行者上生成所有任务,否则不会执行任何工作。
在最简单的情况下,执行者可能看起来像这样:
pub struct SpinExecutor {
tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
}
impl SpinExecutor {
pub fn spawn<T>(&mut self, task: T)
where T: Future<Item = (), Error = ()> + 'static
{
self.tasks.push_back(Box::new(task));
}
pub fn run(&mut self) {
while let Some(mut task) = self.tasks.pop_front() {
match task.poll().unwrap() {
Async::Ready(_) => {}
Async::NotReady => {
self.tasks.push_back(task);
}
}
}
}
}
当然,这不会非常有效。 执行程序在一个繁忙的循环中运转并尝试轮询所有任务,即使任务将再次返回NotReady。
理想情况下,执行者可以通过某种方式知道任务的“准备就绪”状态何时被改变,即当轮询调用返回Ready时。 执行者看起来像这样:
pub fn run(&mut self) {
loop {
while let Some(mut task) = self.ready_tasks.pop_front() {
match task.poll().unwrap() {
Async::Ready(_) => {}
Async::NotReady => {
self.not_ready_tasks.push_back(task);
}
}
}
if self.not_ready_tasks.is_empty() {
return;
}
// Put the thread to sleep until there is work to do
self.sleep_until_tasks_are_ready();
}
}
当任务从“未准备好”变为“准备好”时能够得到通知是future
任务模型的核心。 我们将很快进一步深入研究。