Streaming Communication Model

Introduction to using Dubbo Rust to quickly develop services for Client streaming, Server streaming, and Bidirectional streaming models.

This article focuses on the Dubbo Rust Streaming communication pattern. Please refer to the Quick Start to understand the basic usage of Dubbo Rust and view the complete example for this article.

1 Adding Streaming Model Definitions in IDL

The complete Greeter service definition is as follows, which includes a Unary, Client stream, Server stream, and Bidirectional stream model for Dubbo services.

  1. // ./proto/greeter.proto
  2. syntax = "proto3";
  3. option java_multiple_files = true;
  4. package org.apache.dubbo.sample.tri;
  5. // The request message containing the user's name.
  6. message GreeterRequest {
  7. string name = 1;
  8. }
  9. // The response message containing the greetings
  10. message GreeterReply {
  11. string message = 1;
  12. }
  13. service Greeter{
  14. // unary
  15. rpc greet(GreeterRequest) returns (GreeterReply);
  16. // clientStream
  17. rpc greetClientStream(stream GreeterRequest) returns (GreeterReply);
  18. // serverStream
  19. rpc greetServerStream(GreeterRequest) returns (stream GreeterReply);
  20. // bi streaming
  21. rpc greetStream(stream GreeterRequest) returns (stream GreeterReply);
  22. }

2 Writing Logic with Streaming Model Definitions

2.1 Writing the Streaming Server

  1. // ./src/greeter/server.rs
  2. pub mod protos {
  3. include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
  4. }
  5. use futures_util::StreamExt;
  6. use protos::{
  7. greeter_server::{register_server, Greeter},
  8. GreeterReply, GreeterRequest,
  9. };
  10. use std::{io::ErrorKind, pin::Pin};
  11. use async_trait::async_trait;
  12. use futures_util::Stream;
  13. use tokio::sync::mpsc;
  14. use tokio_stream::wrappers::ReceiverStream;
  15. use dubbo_config::RootConfig;
  16. use dubbo::{codegen::*, Dubbo};
  17. type ResponseStream =
  18. Pin<Box<dyn Stream<Item = Result<GreeterReply, dubbo::status::Status>> + Send>>;
  19. #[tokio::main]
  20. async fn main() {
  21. register_server(GreeterServerImpl {
  22. name: "greeter".to_string(),
  23. });
  24. // Dubbo::new().start().await;
  25. Dubbo::new()
  26. .with_config({
  27. let r = RootConfig::new();
  28. match r.load() {
  29. Ok(config) => config,
  30. Err(_err) => panic!("err: {:?}", _err), // response was dropped
  31. }
  32. })
  33. .start()
  34. .await;
  35. }
  36. #[allow(dead_code)]
  37. #[derive(Default, Clone)]
  38. struct GreeterServerImpl {
  39. name: String,
  40. }
  41. // #[async_trait]
  42. #[async_trait]
  43. impl Greeter for GreeterServerImpl {
  44. async fn greet(
  45. &self,
  46. request: Request<GreeterRequest>,
  47. ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
  48. println!("GreeterServer::greet {:?}", request.metadata);
  49. Ok(Response::new(GreeterReply {
  50. message: "hello, dubbo-rust".to_string(),
  51. }))
  52. }
  53. async fn greet_client_stream(
  54. &self,
  55. request: Request<Decoding<GreeterRequest>>,
  56. ) -> Result<Response<GreeterReply>, dubbo::status::Status> {
  57. let mut s = request.into_inner();
  58. loop {
  59. let result = s.next().await;
  60. match result {
  61. Some(Ok(val)) => println!("result: {:?}", val),
  62. Some(Err(val)) => println!("err: {:?}", val),
  63. None => break,
  64. }
  65. }
  66. Ok(Response::new(GreeterReply {
  67. message: "hello client streaming".to_string(),
  68. }))
  69. }
  70. type greetServerStreamStream = ResponseStream;
  71. async fn greet_server_stream(
  72. &self,
  73. request: Request<GreeterRequest>,
  74. ) -> Result<Response<Self::greetServerStreamStream>, dubbo::status::Status> {
  75. println!("greet_server_stream: {:?}", request.into_inner());
  76. let data = vec![
  77. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  78. message: "msg1 from server".to_string(),
  79. }),
  80. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  81. message: "msg2 from server".to_string(),
  82. }),
  83. Result::<_, dubbo::status::Status>::Ok(GreeterReply {
  84. message: "msg3 from server".to_string(),
  85. }),
  86. ];
  87. let resp = futures_util::stream::iter(data);
  88. Ok(Response::new(Box::pin(resp)))
  89. }
  90. type greetStreamStream = ResponseStream;
  91. async fn greet_stream(
  92. &self,
  93. request: Request<Decoding<GreeterRequest>>,
  94. ) -> Result<Response<Self::greetStreamStream>, dubbo::status::Status> {
  95. println!(
  96. "GreeterServer::greet_stream, grpc header: {:?}",
  97. request.metadata
  98. );
  99. let mut in_stream = request.into_inner();
  100. let (tx, rx) = mpsc::channel(128);
  101. tokio::spawn(async move {
  102. while let Some(result) = in_stream.next().await {
  103. match result {
  104. Ok(v) => {
  105. tx.send(Ok(GreeterReply {
  106. message: format!("server reply: {:?}", v.name),
  107. }))
  108. .await
  109. .expect("working rx")
  110. }
  111. Err(err) => {
  112. if let Some(io_err) = match_for_io_error(&err) {
  113. if io_err.kind() == ErrorKind::BrokenPipe {
  114. eprintln!("\tclient disconnected: broken pipe");
  115. break;
  116. }
  117. }
  118. match tx.send(Err(err)).await {
  119. Ok(_) => (),
  120. Err(_err) => break,
  121. }
  122. }
  123. }
  124. }
  125. println!("\tstream ended");
  126. });
  127. let out_stream = ReceiverStream::new(rx);
  128. Ok(Response::new(
  129. Box::pin(out_stream) as Self::greetStreamStream
  130. ))
  131. }
  132. }
  133. fn match_for_io_error(err_status: &dubbo::status::Status) -> Option<&std::io::Error> {
  134. let mut err: &(dyn std::error::Error + 'static) = err_status;
  135. loop {
  136. if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
  137. return Some(io_err);
  138. }
  139. err = match err.source() {
  140. Some(err) => err,
  141. None => return None,
  142. };
  143. }
  144. }

2.2 Writing the Streaming Client

  1. // ./src/greeter/client.rs
  2. pub mod protos {
  3. include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
  4. }
  5. use dubbo::codegen::*;
  6. use futures_util::StreamExt;
  7. use protos::{greeter_client::GreeterClient, GreeterRequest};
  8. #[tokio::main]
  9. async fn main() {
  10. let mut cli = GreeterClient::new().with_uri("http://127.0.0.1:8888".to_string());
  11. println!("# unary call");
  12. let resp = cli
  13. .greet(Request::new(GreeterRequest {
  14. name: "message from client".to_string(),
  15. }))
  16. .await;
  17. let resp = match resp {
  18. Ok(resp) => resp,
  19. Err(err) => return println!("{:?}", err),
  20. };
  21. let (_parts, body) = resp.into_parts();
  22. println!("Response: {:?}", body);
  23. println!("# client stream");
  24. let data = vec![
  25. GreeterRequest {
  26. name: "msg1 from client streaming".to_string(),
  27. },
  28. GreeterRequest {
  29. name: "msg2 from client streaming".to_string(),
  30. },
  31. GreeterRequest {
  32. name: "msg3 from client streaming".to_string(),
  33. },
  34. ];
  35. let req = futures_util::stream::iter(data);
  36. let resp = cli.greet_client_stream(req).await;
  37. let client_streaming_resp = match resp {
  38. Ok(resp) => resp,
  39. Err(err) => return println!("{:?}", err),
  40. };
  41. let (_parts, resp_body) = client_streaming_resp.into_parts();
  42. println!("client streaming, Response: {:?}", resp_body);
  43. println!("# bi stream");
  44. let data = vec![
  45. GreeterRequest {
  46. name: "msg1 from client".to_string(),
  47. },
  48. GreeterRequest {
  49. name: "msg2 from client".to_string(),
  50. },
  51. GreeterRequest {
  52. name: "msg3 from client".to_string(),
  53. },
  54. ];
  55. let req = futures_util::stream::iter(data);
  56. let bidi_resp = cli.greet_stream(req).await.unwrap();
  57. let (parts, mut body) = bidi_resp.into_parts();
  58. println!("parts: {:?}", parts);
  59. while let Some(item) = body.next().await {
  60. match item {
  61. Ok(v) => {
  62. println!("reply: {:?}", v);
  63. }
  64. Err(err) => {
  65. println!("err: {:?}", err);
  66. }
  67. }
  68. }
  69. let trailer = body.trailer().await.unwrap();
  70. println!("trailer: {:?}", trailer);
  71. println!("# server stream");
  72. let resp = cli
  73. .greet_server_stream(Request::new(GreeterRequest {
  74. name: "server streaming req".to_string(),
  75. }))
  76. .await
  77. .unwrap();
  78. let (parts, mut body) = resp.into_parts();
  79. println!("parts: {:?}", parts);
  80. while let Some(item) = body.next().await {
  81. match item {
  82. Ok(v) => {
  83. println!("reply: {:?}", v);
  84. }
  85. Err(err) => {
  86. println!("err: {:?}", err);
  87. }
  88. }
  89. }
  90. let trailer = body.trailer().await.unwrap();
  91. println!("trailer: {:?}", trailer);
  92. }

3 Running the Example

  1. Build

Run cargo build to compile the server and client.

  1. Run the server

Run ./target/debug/greeter-server to start the server. As configured in the above dubbo.yaml, the server will listen on port 8888 and provide RPC services via the triple protocol:

  1. $ ./target/debug/greeter-server
  2. 2022-09-28T23:33:28.104577Z INFO dubbo::framework: url: Some(Url { uri: "triple://0.0.0.0:8888/org.apache.dubbo.sample.tri.Greeter", protocol: "triple", location: "0.0.0.0:8888", ip: "0.0.0.0", port: "8888", service_key: ["org.apache.dubbo.sample.tri.Greeter"], params: {} })
  1. Run the client to see the Streaming communication effects

Run ./target/debug/greeter-client to execute the client, calling various methods under triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter:

  1. $ ./target/debug/greeter-client
  2. # unary call
  3. Response: GreeterReply { message: "hello, dubbo-rust" }
  4. # client stream
  5. client streaming, Response: GreeterReply { message: "hello client streaming" }
  6. # bi stream
  7. parts: Metadata { inner: {"content-type": "application/grpc", "date": "Wed, 28 Sep 2022 23:34:20 GMT"} }
  8. reply: GreeterReply { message: "server reply: \"msg1 from client\"" }
  9. reply: GreeterReply { message: "server reply: \"msg2 from client\"" }
  10. reply: GreeterReply { message: "server reply: \"msg3 from client\"" }
  11. trailer: Some(Metadata { inner: {"content-type": "application/grpc", "grpc-status": "0", "grpc-message": "poll trailer successfully.", "grpc-accept-encoding": "gzip,identity"} })
  12. # server stream
  13. parts: Metadata { inner: {"content-type": "application/grpc", "date": "Wed, 28 Sep 2022 23:34:20 GMT"} }
  14. reply: GreeterReply { message: "msg1 from server" }
  15. reply: GreeterReply { message: "msg2 from server" }
  16. reply: GreeterReply { message: "msg3 from server" }
  17. trailer: Some(Metadata { inner: {"content-type": "application/grpc", "grpc-status": "0", "grpc-message": "poll trailer successfully.", "grpc-accept-encoding": "gzip,identity"} })

Feedback

Was this page helpful?

Yes No

Last modified September 30, 2024: Update & Translate Overview Docs (#3040) (d37ebceaea7)