Streaming 通信模型

介绍使用 Dubbo Rust 快速开发 Client streaming、Server streaming、Bidirectional streaming 模型的服务。

本文重点讲解 Dubbo Rust Streaming 通信模式,请先查看 Quick Start 了解 Dubbo Rust 基本使用,在此查看本文的完整示例

1 IDL 中增加 Streaming 模型定义

完整 Greeter 服务定义如下,包含一个 Unary、Client stream、Server stream、Bidirectional stream 模型的 Dubbo 服务。

  1. // ./proto/greeter.proto
  2. syntax = "proto3";
  3. option java_multiple_files = true;
  4. package org.apache.dubbo.sample.tri;
  5. // The request message containing the user's name.
  6. message GreeterRequest {
  7. string name = 1;
  8. }
  9. // The response message containing the greetings
  10. message GreeterReply {
  11. string message = 1;
  12. }
  13. service Greeter{
  14. // unary
  15. rpc greet(GreeterRequest) returns (GreeterReply);
  16. // clientStream
  17. rpc greetClientStream(stream GreeterRequest) returns (GreeterReply);
  18. // serverStream
  19. rpc greetServerStream(GreeterRequest) returns (stream GreeterReply);
  20. // bi streaming
  21. rpc greetStream(stream GreeterRequest) returns (stream GreeterReply);
  22. }

2 使用 Streaming 模型定义编写逻辑

2.1 编写 Streaming Server

  1. // ./src/greeter/server.rs
  2. pub mod protos {
  3. include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
  4. }
  5. use futures_util::StreamExt;
  6. use protos::{
  7. greeter_server::{register_server, Greeter},
  8. GreeterReply, GreeterRequest,
  9. };
  10. use std::{io::ErrorKind, pin::Pin};
  11. use async_trait::async_trait;
  12. use futures_util::Stream;
  13. use tokio::sync::mpsc;
  14. use tokio_stream::wrappers::ReceiverStream;
  15. use dubbo_config::RootConfig;
  16. use dubbo::{codegen::*, Dubbo};
  17. type ResponseStream =
  18. Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + Send>>;
  19. #[tokio::main]
  20. async fn main() {
  21. register_server(GreeterServerImpl {
  22. name: "greeter".to_string(),
  23. });
  24. // Dubbo::new().start().await;
  25. Dubbo::new()
  26. .with_config({
  27. let r = RootConfig::new();
  28. match r.load() {
  29. Ok(config) => config,
  30. Err(_err) => panic!("err: {:?}", _err), // response was droped
  31. }
  32. })
  33. .start()
  34. .await;
  35. }
  36. #[allow(dead_code)]
  37. #[derive(Default, Clone)]
  38. struct GreeterServerImpl {
  39. name: String,
  40. }
  41. // #[async_trait]
  42. #[async_trait]
  43. impl Greeter for GreeterServerImpl {
  44. async fn greet(
  45. &self,
  46. request: Request<GreeterRequest>,
  47. ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
  48. println!("GreeterServer::greet {:?}", request.metadata);
  49. Ok(Response::new(GreeterReply {
  50. message: "hello, dubbo-rust".to_string(),
  51. }))
  52. }
  53. async fn greet_client_stream(
  54. &self,
  55. request: Request<Decoding<GreeterRequest>>,
  56. ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
  57. let mut s = request.into_inner();
  58. loop {
  59. let result = s.next().await;
  60. match result {
  61. Some(Ok(val)) => println!("result: {:?}", val),
  62. Some(Err(val)) => println!("err: {:?}", val),
  63. None => break,
  64. }
  65. }
  66. Ok(Response::new(GreeterReply {
  67. message: "hello client streaming".to_string(),
  68. }))
  69. }
  70. type greetServerStreamStream = ResponseStream;
  71. async fn greet_server_stream(
  72. &self,
  73. request: Request<GreeterRequest>,
  74. ) -> Result<Response<Self::greetServerStreamStream>, dubbo::status::Status> {
  75. println!("greet_server_stream: {:?}", request.into_inner());
  76. let data = vec![
  77. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  78. message: "msg1 from server".to_string(),
  79. }),
  80. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  81. message: "msg2 from server".to_string(),
  82. }),
  83. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  84. message: "msg3 from server".to_string(),
  85. }),
  86. ];
  87. let resp = futures_util::stream::iter(data);
  88. Ok(Response::new(Box::pin(resp)))
  89. }
  90. type greetStreamStream = ResponseStream;
  91. async fn greet_stream(
  92. &self,
  93. request: Request<Decoding<GreeterRequest>>,
  94. ) -> Result<Response<Self::greetStreamStream>, dubbo::status::Status> {
  95. println!(
  96. "GreeterServer::greet_stream, grpc header: {:?}",
  97. request.metadata
  98. );
  99. let mut in_stream = request.into_inner();
  100. let (tx, rx) = mpsc::channel(128);
  101. // this spawn here is required if you want to handle connection error.
  102. // If we just map `in_stream` and write it back as `out_stream` the `out_stream`
  103. // will be drooped when connection error occurs and error will never be propagated
  104. // to mapped version of `in_stream`.
  105. tokio::spawn(async move {
  106. while let Some(result) = in_stream.next().await {
  107. match result {
  108. Ok(v) => {
  109. // if v.name.starts_with("msg2") {
  110. // tx.send(Err(dubbo::status::Status::internal(format!("err: args is invalid, {:?}", v.name))
  111. // )).await.expect("working rx");
  112. // continue;
  113. // }
  114. tx.send(Ok(GreeterReply {
  115. message: format!("server reply: {:?}", v.name),
  116. }))
  117. .await
  118. .expect("working rx")
  119. }
  120. Err(err) => {
  121. if let Some(io_err) = match_for_io_error(&err) {
  122. if io_err.kind() == ErrorKind::BrokenPipe {
  123. // here you can handle special case when client
  124. // disconnected in unexpected way
  125. eprintln!("\tclient disconnected: broken pipe");
  126. break;
  127. }
  128. }
  129. match tx.send(Err(err)).await {
  130. Ok(_) => (),
  131. Err(_err) => break, // response was droped
  132. }
  133. }
  134. }
  135. }
  136. println!("\tstream ended");
  137. });
  138. // echo just write the same data that was received
  139. let out_stream = ReceiverStream::new(rx);
  140. Ok(Response::new(
  141. Box::pin(out_stream) as Self::greetStreamStream
  142. ))
  143. }
  144. }
  145. fn match_for_io_error(err_status: &dubbo::status::Status) -> Option<&std::io::Error> {
  146. let mut err: &(dyn std::error::Error + 'static) = err_status;
  147. loop {
  148. if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
  149. return Some(io_err);
  150. }
  151. err = match err.source() {
  152. Some(err) => err,
  153. None => return None,
  154. };
  155. }
  156. }

2.2 编写 Streaming Client

  1. // ./src/greeter/client.rs
  2. pub mod protos {
  3. include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
  4. }
  5. use dubbo::codegen::*;
  6. use futures_util::StreamExt;
  7. use protos::{greeter_client::GreeterClient, GreeterRequest};
  8. #[tokio::main]
  9. async fn main() {
  10. let mut cli = GreeterClient::new().with_uri("http://127.0.0.1:8888".to_string());
  11. println!("# unary call");
  12. let resp = cli
  13. .greet(Request::new(GreeterRequest {
  14. name: "message from client".to_string(),
  15. }))
  16. .await;
  17. let resp = match resp {
  18. Ok(resp) => resp,
  19. Err(err) => return println!("{:?}", err),
  20. };
  21. let (_parts, body) = resp.into_parts();
  22. println!("Response: {:?}", body);
  23. println!("# client stream");
  24. let data = vec![
  25. GreeterRequest {
  26. name: "msg1 from client streaming".to_string(),
  27. },
  28. GreeterRequest {
  29. name: "msg2 from client streaming".to_string(),
  30. },
  31. GreeterRequest {
  32. name: "msg3 from client streaming".to_string(),
  33. },
  34. ];
  35. let req = futures_util::stream::iter(data);
  36. let resp = cli.greet_client_stream(req).await;
  37. let client_streaming_resp = match resp {
  38. Ok(resp) => resp,
  39. Err(err) => return println!("{:?}", err),
  40. };
  41. let (_parts, resp_body) = client_streaming_resp.into_parts();
  42. println!("client streaming, Response: {:?}", resp_body);
  43. println!("# bi stream");
  44. let data = vec![
  45. GreeterRequest {
  46. name: "msg1 from client".to_string(),
  47. },
  48. GreeterRequest {
  49. name: "msg2 from client".to_string(),
  50. },
  51. GreeterRequest {
  52. name: "msg3 from client".to_string(),
  53. },
  54. ];
  55. let req = futures_util::stream::iter(data);
  56. let bidi_resp = cli.greet_stream(req).await.unwrap();
  57. let (parts, mut body) = bidi_resp.into_parts();
  58. println!("parts: {:?}", parts);
  59. while let Some(item) = body.next().await {
  60. match item {
  61. Ok(v) => {
  62. println!("reply: {:?}", v);
  63. }
  64. Err(err) => {
  65. println!("err: {:?}", err);
  66. }
  67. }
  68. }
  69. let trailer = body.trailer().await.unwrap();
  70. println!("trailer: {:?}", trailer);
  71. println!("# server stream");
  72. let resp = cli
  73. .greet_server_stream(Request::new(GreeterRequest {
  74. name: "server streaming req".to_string(),
  75. }))
  76. .await
  77. .unwrap();
  78. let (parts, mut body) = resp.into_parts();
  79. println!("parts: {:?}", parts);
  80. while let Some(item) = body.next().await {
  81. match item {
  82. Ok(v) => {
  83. println!("reply: {:?}", v);
  84. }
  85. Err(err) => {
  86. println!("err: {:?}", err);
  87. }
  88. }
  89. }
  90. let trailer = body.trailer().await.unwrap();
  91. println!("trailer: {:?}", trailer);
  92. }

3 运行示例

  1. 编译

执行cargo build来编译server和client。

  1. 运行server

执行./target/debug/greeter-server来运行server,如上文dubbo.yaml所配置,server会监听8888端口,并以triple协议提供RPC服务:

  1. $ ./target/debug/greeter-server
  2. 2022-09-28T23:33:28.104577Z INFO dubbo::framework: url: Some(Url { uri: "triple://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter", protocol: "triple", location: "0.0.0.0:8888", ip: "0.0.0.0", port: "8888", service_key: ["org.apache.dubbo.sample.tri.Greeter"], params: {} })
  1. 运行client,可以看到 Streaming 通信效果

执行./target/debug/greeter-client来运行client,调用triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter下的各种方法:

  1. $ ./target/debug/greeter-client
  2. # unary call
  3. Response: GreeterReply { message: "hello, dubbo-rust" }
  4. # client stream
  5. client streaming, Response: GreeterReply { message: "hello client streaming" }
  6. # bi stream
  7. parts: Metadata { inner: {"content-type": "application/grpc", "date": "Wed, 28 Sep 2022 23:34:20 GMT"} }
  8. reply: GreeterReply { message: "server reply: \"msg1 from client\"" }
  9. reply: GreeterReply { message: "server reply: \"msg2 from client\"" }
  10. reply: GreeterReply { message: "server reply: \"msg3 from client\"" }
  11. trailer: Some(Metadata { inner: {"content-type": "application/grpc", "grpc-status": "0", "grpc-message": "poll trailer successfully.", "grpc-accept-encoding": "gzip,identity"} })
  12. # server stream
  13. parts: Metadata { inner: {"content-type": "application/grpc", "date": "Wed, 28 Sep 2022 23:34:20 GMT"} }
  14. reply: GreeterReply { message: "msg1 from server" }
  15. reply: GreeterReply { message: "msg2 from server" }
  16. reply: GreeterReply { message: "msg3 from server" }
  17. trailer: Some(Metadata { inner: {"content-type": "application/grpc", "grpc-status": "0", "grpc-message": "poll trailer successfully.", "grpc-accept-encoding": "gzip,identity"} })

最后修改 March 2, 2023: Fix alias forward in docs3-v2 (#2342) (f16c1535592)