Triple 协议

Triple 协议使用

Triple 协议是 Dubbo3 的主力协议,完整兼容 gRPC over HTTP/2,并在协议层面扩展了负载均衡和流量控制相关机制。本文档旨在指导用户正确的使用 Triple 协议。

在开始前,需要决定服务使用的序列化方式,如果为新服务,推荐使用 protobuf 作为默认序列化,在性能和跨语言上的效果都会更好。如果是原有服务想进行协议升级,Triple 协议也已经支持其他序列化方式,如 Hessian / JSON 等

Protobuf

  1. 编写 IDL 文件

    1. syntax = "proto3";
    2. option java_multiple_files = true;
    3. option java_package = "org.apache.dubbo.hello";
    4. option java_outer_classname = "HelloWorldProto";
    5. option objc_class_prefix = "HLW";
    6. package helloworld;
    7. // The request message containing the user's name.
    8. message HelloRequest {
    9. string name = 1;
    10. }
    11. // The response message containing the greetings
    12. message HelloReply {
    13. string message = 1;
    14. }
  2. 添加编译 protobuf 的 extension 和 plugin (以 maven 为例)

    1. <extensions>
    2. <extension>
    3. <groupId>kr.motd.maven</groupId>
    4. <artifactId>os-maven-plugin</artifactId>
    5. <version>1.6.1</version>
    6. </extension>
    7. </extensions>
    8. <plugins>
    9. <plugin>
    10. <groupId>org.xolstice.maven.plugins</groupId>
    11. <artifactId>protobuf-maven-plugin</artifactId>
    12. <version>0.6.1</version>
    13. <configuration>
    14. <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
    15. <pluginId>triple-java</pluginId>
    16. <outputDirectory>build/generated/source/proto/main/java</outputDirectory>
    17. </configuration>
    18. <executions>
    19. <execution>
    20. <goals>
    21. <goal>compile</goal>
    22. <goal>test-compile</goal>
    23. </goals>
    24. </execution>
    25. </executions>
    26. </plugin>
    27. </plugins>
  3. 构建/ 编译生成 protobuf Message 类

    1. $ mvn clean install

Unary 方式

  1. 编写 Java 接口

    1. import org.apache.dubbo.hello.HelloReply;
    2. import org.apache.dubbo.hello.HelloRequest;
    3. public interface IGreeter {
    4. /**
    5. * <pre>
    6. * Sends a greeting
    7. * </pre>
    8. */
    9. HelloReply sayHello(HelloRequest request);
    10. }
  2. 创建 Provider

    1. public static void main(String[] args) throws InterruptedException {
    2. ServiceConfig<IGreeter> service = new ServiceConfig<>();
    3. service.setInterface(IGreeter.class);
    4. service.setRef(new IGreeter1Impl());
    5. // 这里需要显示声明使用的协议为triple
    6. service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
    7. service.setApplication(new ApplicationConfig("demo-provider"));
    8. service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    9. service.export();
    10. System.out.println("dubbo service started");
    11. new CountDownLatch(1).await();
    12. }
  3. 创建 Consumer

    1. public static void main(String[] args) throws IOException {
    2. ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
    3. ref.setInterface(IGreeter.class);
    4. ref.setCheck(false);
    5. ref.setInterface(IGreeter.class);
    6. ref.setCheck(false);
    7. ref.setProtocol(CommonConstants.TRIPLE);
    8. ref.setLazy(true);
    9. ref.setTimeout(100000);
    10. ref.setApplication(new ApplicationConfig("demo-consumer"));
    11. ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    12. final IGreeter iGreeter = ref.get();
    13. System.out.println("dubbo ref started");
    14. try {
    15. final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
    16. .setName("name")
    17. .build());
    18. TimeUnit.SECONDS.sleep(1);
    19. System.out.println("Reply:" + reply);
    20. } catch (Throwable t) {
    21. t.printStackTrace();
    22. }
    23. System.in.read();
    24. }
  4. 运行 Provider 和 Consumer ,可以看到请求正常返回了

    Reply:message: “name”

stream 方式

  1. 编写 Java 接口

    1. import org.apache.dubbo.hello.HelloReply;
    2. import org.apache.dubbo.hello.HelloRequest;
    3. public interface IGreeter {
    4. /**
    5. * <pre>
    6. * Sends greeting by stream
    7. * </pre>
    8. */
    9. StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    10. }
  2. 编写实现类

    1. public class IStreamGreeterImpl implements IStreamGreeter {
    2. @Override
    3. public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
    4. return new StreamObserver<HelloRequest>() {
    5. private List<HelloReply> replyList = new ArrayList<>();
    6. @Override
    7. public void onNext(HelloRequest helloRequest) {
    8. System.out.println("onNext receive request name:" + helloRequest.getName());
    9. replyList.add(HelloReply.newBuilder()
    10. .setMessage("receive name:" + helloRequest.getName())
    11. .build());
    12. }
    13. @Override
    14. public void onError(Throwable cause) {
    15. System.out.println("onError");
    16. replyObserver.onError(cause);
    17. }
    18. @Override
    19. public void onCompleted() {
    20. System.out.println("onComplete receive request size:" + replyList.size());
    21. for (HelloReply reply : replyList) {
    22. replyObserver.onNext(reply);
    23. }
    24. replyObserver.onCompleted();
    25. }
    26. };
    27. }
    28. }
  3. 创建 Provider

    1. public class StreamProvider {
    2. public static void main(String[] args) throws InterruptedException {
    3. ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
    4. service.setInterface(IStreamGreeter.class);
    5. service.setRef(new IStreamGreeterImpl());
    6. service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
    7. service.setApplication(new ApplicationConfig("stream-provider"));
    8. service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
    9. service.export();
    10. System.out.println("dubbo service started");
    11. new CountDownLatch(1).await();
    12. }
    13. }
  4. 创建 Consumer

    1. public class StreamConsumer {
    2. public static void main(String[] args) throws InterruptedException, IOException {
    3. ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
    4. ref.setInterface(IStreamGreeter.class);
    5. ref.setCheck(false);
    6. ref.setProtocol(CommonConstants.TRIPLE);
    7. ref.setLazy(true);
    8. ref.setTimeout(100000);
    9. ref.setApplication(new ApplicationConfig("stream-consumer"));
    10. ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
    11. final IStreamGreeter iStreamGreeter = ref.get();
    12. System.out.println("dubbo ref started");
    13. try {
    14. StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
    15. @Override
    16. public void onNext(HelloReply reply) {
    17. System.out.println("onNext");
    18. System.out.println(reply.getMessage());
    19. }
    20. @Override
    21. public void onError(Throwable throwable) {
    22. System.out.println("onError:" + throwable.getMessage());
    23. }
    24. @Override
    25. public void onCompleted() {
    26. System.out.println("onCompleted");
    27. }
    28. });
    29. streamObserver.onNext(HelloRequest.newBuilder()
    30. .setName("tony")
    31. .build());
    32. streamObserver.onNext(HelloRequest.newBuilder()
    33. .setName("nick")
    34. .build());
    35. streamObserver.onCompleted();
    36. } catch (Throwable t) {
    37. t.printStackTrace();
    38. }
    39. System.in.read();
    40. }
    41. }
  5. 运行 Provider 和 Consumer ,可以看到请求正常返回了

    onNext
    receive name:tony
    onNext
    receive name:nick
    onCompleted

其他序列化方式

省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的协议即可完成协议升级。

示例程序

本文的示例程序可以在 triple-samples 找到

最后修改 September 21, 2021: Bug fix miss mialbox (#953) (57cf51b)