非阻塞的网络通信

尽管上一章节中的 HTTP 连接实现起来很容易,但他们并不适合在生产环境使用。如果程序一次只允许一个连接开启(阻塞),CPU 将不得不等待缓慢的网络。非阻塞 I/O 意味着程序可以同时保持多个连接开启,并处理这些连接传输的数据。程序可以轮询这些开启的连接,或是等待输入的数据触发异步函数。这将让 I/O 密集的程序在单线程环境中运行得非常快。在这一章中,我们将介绍轮询和异步编程模型。

非阻塞 HTTP 客户端示例

非阻塞的 HTTP 客户端程序的源代码在这里。下面的 main() 函数开启了两个 HTTP 连接。它同时保持两个连接开启,并轮流查看是否有数据传输进来。换句话说,这两个连接并不会相互阻塞。他们的数据在传输进来的时候被同时(或者轮流)处理。

  1. use httparse::{Response, EMPTY_HEADER};
  2. use std::io::{self, Read, Write};
  3. use std::str::from_utf8;
  4. use wasmedge_wasi_socket::TcpStream;
  5. fn main() {
  6. let req = "GET / HTTP/1.0\n\n";
  7. let mut first_connection = TcpStream::connect("127.0.0.1:80").unwrap();
  8. first_connection.set_nonblocking(true).unwrap();
  9. first_connection.write_all(req.as_bytes()).unwrap();
  10. let mut second_connection = TcpStream::connect("127.0.0.1:80").unwrap();
  11. second_connection.set_nonblocking(true).unwrap();
  12. second_connection.write_all(req.as_bytes()).unwrap();
  13. let mut first_buf = vec![0; 4096];
  14. let mut first_bytes_read = 0;
  15. let mut second_buf = vec![0; 4096];
  16. let mut second_bytes_read = 0;
  17. loop {
  18. let mut first_complete = false;
  19. let mut second_complete = false;
  20. if !first_complete {
  21. match read_data(&mut first_connection, &mut first_buf, first_bytes_read) {
  22. Ok((bytes_read, false)) => {
  23. first_bytes_read = bytes_read;
  24. }
  25. Ok((bytes_read, true)) => {
  26. println!("First connection completed");
  27. if bytes_read != 0 {
  28. parse_data(&first_buf, bytes_read);
  29. }
  30. first_complete = true;
  31. }
  32. Err(e) => {
  33. println!("First connection error: {}", e);
  34. first_complete = true;
  35. }
  36. }
  37. }
  38. if !second_complete {
  39. match read_data(&mut second_connection, &mut second_buf, second_bytes_read) {
  40. Ok((bytes_read, false)) => {
  41. second_bytes_read = bytes_read;
  42. }
  43. Ok((bytes_read, true)) => {
  44. println!("Second connection completed");
  45. if bytes_read != 0 {
  46. parse_data(&second_buf, bytes_read);
  47. }
  48. second_complete = true;
  49. }
  50. Err(e) => {
  51. println!("Second connection error: {}", e);
  52. second_complete = true;
  53. }
  54. }
  55. }
  56. if first_complete && second_complete {
  57. break;
  58. }
  59. }
  60. }

使用如下命令来编译 Rust 程序。

cargo build --target wasm32-wasi --release

使用如下命令在 WasmEdge 中运行程序。

  1. #![allow(unused)]
  2. fn main() {
  3. wasmedge target/wasm32-wasi/release/nonblock_http_client.wasm
  4. }

非阻塞 HTTP 服务器示例

非阻塞的 HTTP 服务器程序的 源代码在这里。下面的 main() 函数开启了一个 HTTP 服务器。它同时从多个开启的连接中接收事件,并通过调用注册在每个连接的异步处理函数来处理这些事件。服务器可以同时从多个开启的连接中处理事件。

  1. fn main() -> std::io::Result<()> {
  2. let mut poll = Poll::new();
  3. let server = TcpListener::bind("127.0.0.1:1234", true)?;
  4. println!("Listening on 127.0.0.1:1234");
  5. let mut connections = HashMap::new();
  6. let mut handlers = HashMap::new();
  7. const SERVER: Token = Token(0);
  8. let mut unique_token = Token(SERVER.0 + 1);
  9. poll.register(&server, SERVER, Interest::Read);
  10. loop {
  11. let events = poll.poll().unwrap();
  12. for event in events {
  13. match event.token {
  14. SERVER => loop {
  15. let (connection, address) = match server.accept(FDFLAGS_NONBLOCK) {
  16. Ok((connection, address)) => (connection, address),
  17. Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
  18. Err(e) => panic!("accept error: {}", e),
  19. };
  20. println!("Accepted connection from: {}", address);
  21. let token = unique_token.add();
  22. poll.register(&connection, token, Interest::Read);
  23. connections.insert(token, connection);
  24. },
  25. token => {
  26. let done = if let Some(connection) = connections.get_mut(&token) {
  27. let handler = match handlers.get_mut(&token) {
  28. Some(handler) => handler,
  29. None => {
  30. let handler = Handler::new();
  31. handlers.insert(token, handler);
  32. handlers.get_mut(&token).unwrap()
  33. }
  34. };
  35. handle_connection(&mut poll, connection, handler, &event)?
  36. } else {
  37. false
  38. };
  39. if done {
  40. if let Some(connection) = connections.remove(&token) {
  41. connection.shutdown(Shutdown::Both)?;
  42. poll.unregister(&connection);
  43. handlers.remove(&token);
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }

handle_connection() 函数处理来自于开启的连接的数据。当前它只是将请求的内容写回响应中。这同样是以异步的形式完成的,意味着 handle_connection() 函数为响应创建了一个事件,然后将事件放入了一个队列中。主程序循环处理这些事件,并在等待来自其他连接的数据时发送这些响应。

  1. #![allow(unused)]
  2. fn main() {
  3. fn handle_connection(
  4. poll: &mut Poll,
  5. connection: &mut TcpStream,
  6. handler: &mut Handler,
  7. event: &Event,
  8. ) -> io::Result<bool> {
  9. if event.is_readable() {
  10. let mut connection_closed = false;
  11. let mut received_data = vec![0; 4096];
  12. let mut bytes_read = 0;
  13. loop {
  14. match connection.read(&mut received_data[bytes_read..]) {
  15. Ok(0) => {
  16. connection_closed = true;
  17. break;
  18. }
  19. Ok(n) => {
  20. bytes_read += n;
  21. if bytes_read == received_data.len() {
  22. received_data.resize(received_data.len() + 1024, 0);
  23. }
  24. }
  25. Err(ref err) if would_block(err) => {
  26. if bytes_read != 0 {
  27. let received_data = &received_data[..bytes_read];
  28. let mut bs: parsed::stream::ByteStream =
  29. match String::from_utf8(received_data.to_vec()) {
  30. Ok(s) => s,
  31. Err(_) => {
  32. continue;
  33. }
  34. }
  35. .into();
  36. let req = match parsed::http::parse_http_request(&mut bs) {
  37. Some(req) => req,
  38. None => {
  39. break;
  40. }
  41. };
  42. for header in req.headers.iter() {
  43. if header.name.eq("Conntent-Length") {
  44. let content_length = header.value.parse::<usize>().unwrap();
  45. if content_length > received_data.len() {
  46. return Ok(true);
  47. }
  48. }
  49. }
  50. println!(
  51. "{:?} request: {:?} {:?}",
  52. connection.peer_addr().unwrap(),
  53. req.method,
  54. req.path
  55. );
  56. let res = Response {
  57. protocol: "HTTP/1.1".to_string(),
  58. code: 200,
  59. message: "OK".to_string(),
  60. headers: vec![
  61. Header {
  62. name: "Content-Length".to_string(),
  63. value: req.content.len().to_string(),
  64. },
  65. Header {
  66. name: "Connection".to_string(),
  67. value: "close".to_string(),
  68. },
  69. ],
  70. content: req.content,
  71. };
  72. handler.response = Some(res.into());
  73. poll.reregister(connection, event.token, Interest::Write);
  74. break;
  75. } else {
  76. println!("Empty request");
  77. return Ok(true);
  78. }
  79. }
  80. Err(ref err) if interrupted(err) => continue,
  81. Err(err) => return Err(err),
  82. }
  83. }
  84. if connection_closed {
  85. println!("Connection closed");
  86. return Ok(true);
  87. }
  88. }
  89. if event.is_writable() && handler.response.is_some() {
  90. let resp = handler.response.clone().unwrap();
  91. match connection.write(resp.as_bytes()) {
  92. Ok(n) if n < resp.len() => return Err(io::ErrorKind::WriteZero.into()),
  93. Ok(_) => {
  94. return Ok(true);
  95. }
  96. Err(ref err) if would_block(err) => {}
  97. Err(ref err) if interrupted(err) => {
  98. return handle_connection(poll, connection, handler, event)
  99. }
  100. Err(err) => return Err(err),
  101. }
  102. }
  103. Ok(false)
  104. }
  105. }

使用如下命令来编译 Rust 程序。

cargo build --target wasm32-wasi --release

使用如下命令在 WasmEdge 中运行程序。

$ wasmedge target/wasm32-wasi/release/poll_http_server.wasm new connection at 1234

你可以用 curl 发送一个 HTTP 请求,来测试 HTTP 服务器。

$ curl -d "name=WasmEdge" -X POST http://127.0.0.1:1234 echo: name=WasmEdge