Tokio与I/O

tokio crate带有TCP和UDP网络类型。 与std中的类型不同,Tokio的网络类型基于轮询模型,并在其准备状态发生变化(接收数据并刷写写入缓冲区)时通知任务执行程序。 在tokio :: net模块中,您将找到TcpListener,TcpStream和UdpSocket等类型。

所有这些类型都提供了future的API以及poll API。

Tokio网络类型由基于Mio的反应器提供动力,默认情况下,它在后台线程上懒洋洋地启动。 有关详细信息,请参阅reactor文档。

使用Future API我们已经在本指南的前面已经看到了一些传入函数以及tokio_io :: io中的助手。

这些助手包括:

  • incoming:入站TCP连接流。
  • read_exact:准确读取n个字节到缓冲区。
  • read_to_end:将所有字节读入缓冲区。
  • write_all:写入缓冲区的全部内容。
  • copy:将字节从一个I / O句柄复制到另一个I / O句柄。
    很多这些函数/帮助程序都是AsyncRead和AsyncWrite特性的通用函数。这些特征类似于std的Read和Write,但仅适用于“future感知”的类型,即遵循强制属性:

  • 调用读取或写入是非阻塞的,它们永远不会阻塞调用线程。

  • 如果一个调用会以其他方式阻塞,那么会返回一个带有此类WillBlock的错误。如果发生这种情况,则当前future的任务计划在I / O再次准备就绪时接收通知(取消停放)
    请注意 AsyncRead和AsyncWrite类型的用户应使用poll_read和poll_write,而不是直接调用read和write。

例如,以下是如何接受连接,从它们读取5个字节,然后将5个字节写回套接字:

  1. let server = listener.incoming().for_each(|socket| {
  2. println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
  3. let buf = vec![0; 5];
  4. let connection = io::read_exact(socket, buf)
  5. .and_then(|(socket, buf)| {
  6. io::write_all(socket, buf)
  7. })
  8. .then(|_| Ok(())); // Just discard the socket and buffer
  9. // Spawn a new task that processes the socket:
  10. tokio::spawn(connection);
  11. Ok(())
  12. })

使用Poll API

手动实现Future时将使用基于Poll的API,您需要返回Async。 当您需要实现自己的处理自定义逻辑的组合器时,这非常有用。

例如,这就是如何为TcpStream实现read_exact的future

  1. pub struct ReadExact {
  2. state: State,
  3. }
  4. enum State {
  5. Reading {
  6. stream: TcpStream,
  7. buf: Vec<u8>,
  8. pos: usize,
  9. },
  10. Empty,
  11. }
  12. impl Future for ReadExact {
  13. type Item = (TcpStream, Vec<u8>);
  14. type Error = io::Error;
  15. fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
  16. match self.state {
  17. State::Reading {
  18. ref mut stream,
  19. ref mut buf,
  20. ref mut pos
  21. } => {
  22. while *pos < buf.len() {
  23. let n = try_ready!({
  24. stream.poll_read(&mut buf[*pos..])
  25. });
  26. *pos += n;
  27. if n == 0 {
  28. let err = io::Error::new(
  29. io::ErrorKind::UnexpectedEof,
  30. "early eof");
  31. return Err(err)
  32. }
  33. }
  34. }
  35. State::Empty => panic!("poll a ReadExact after it's done"),
  36. }
  37. match mem::replace(&mut self.state, State::Empty) {
  38. State::Reading { stream, buf, .. } => {
  39. Ok(Async::Ready((stream, buf)))
  40. }
  41. State::Empty => panic!(),
  42. }
  43. }
  44. }

数据报(Datagrams)

请注意 ,大多数讨论都是围绕I / O或字节流进行的,而UDP重要的不是! 但是,为了适应这种情况,UdpSocket类型还提供了许多方便的方法:

  • send_dgram允许您表示将数据报作为future发送,如果无法立即发送整个数据报,则返回错误。
  • recv_dgram表示将数据报读入缓冲区,产生缓冲区和来自的地址。