Spring Boot集成gRPC框架

gRPC是谷歌开源的高性能、开源、通用RPC框架。由于gRPC基于HTTP2协议,所以其对移动端非常友好。

本节将介绍Spring Boot集成gRPC的服务端、客户端。

安装protoc及gRPC

gRPC默认使用Protocol Buffers)做为序列化协议,我们首先安装protoc编译器:

在这里下载最新版本的protoc编译器,请根据你的操作系统选择对应版本,这里我选用MacOSX的。

  1. wget https://github.com/protocolbuffers/protobuf/releases/download/v3.17.3/protoc-3.17.3-osx-x86_64.zip
  2. unzip protoc-3.17.3-osx-x86_64.zip

解压缩后,将其加入PATH路径下:

  1. export PATH=$PATH:$YOUR_PROTOC_PATH

试一下是能否执行:

  1. protoc --version
  2. libprotoc 3.17.3

除此之外,我们还需要一个gRPC的Java插件,才能生成gRPC的桩代码,你可以在这里)找到最新版本。这里我们依然选择OSX的64位版本:

  1. wget https://search.maven.org/remotecontent?filepath=io/grpc/protoc-gen-grpc-java/1.40.1/protoc-gen-grpc-java-1.40.1-osx-x86_64.exe

下载后,将其加入PATH路径中。尝试定位一下:

  1. which protoc-gen-grpc-java
  2. Your_Path/protoc-gen-grpc-java

至此,protoc和grpc的安装准备工作已经就绪。

Client侧集成

首先是集成依赖,我们放在client子项目的builld.gradle中:

  1. implementation 'com.google.protobuf:protobuf-java:3.17.3'
  2. implementation "io.grpc:grpc-stub:1.39.0"
  3. implementation "io.grpc:grpc-protobuf:1.39.0"
  4. implementation 'io.grpc:grpc-netty-shaded:1.39.0'

由于版本依赖较多,我建议使用platform统一管理,可以参考前文

接着,我们编写protoc文件,HomsDemo.proto:

  1. syntax = "proto3";
  2. option java_package = "com.coder4.homs.demo";
  3. option java_outer_classname = "HomsDemoProto";
  4. ;
  5. message AddRequest {
  6. int32 val1 = 1;
  7. int32 val2 = 2;
  8. }
  9. message AddResponse {
  10. int32 val = 1;
  11. }
  12. message AddSingleRequest {
  13. int32 val = 1;
  14. }
  15. service HomsDemo {
  16. rpc Add(AddRequest) returns (AddResponse);
  17. rpc Add2(stream AddSingleRequest) returns (AddResponse);
  18. }

我们添加了两个RPC方法:

  • Add是正常的调用

  • Add2是单向Stream调用

接着,我们需要编译,生成桩文件:

  1. #!/bin/sh
  2. DIR=`cd \`dirname ${BASH_SOURCE[0]}\`/.. && pwd`
  3. protoc HomsDemo.proto --java_out=${DIR}/homs-demo-client/src/main/java --proto_path=${DIR}/homs-demo-client/src/main/java/com/coder4/homs/demo/
  4. protoc HomsDemo.proto --plugin=protoc-gen-grpc-java=`which protoc-gen-grpc-java` --grpc-java_out=${DIR}/homs-demo-client/src/main/java --proto_path=${DIR}/homs-demo-client/src/main/java/com/coder4/homs/demo/

这里分为两个步骤:

  • 第一次protoc编译,生成protoc的桩文件

  • 第二次protoc编译,使用了protoc-gen-grpc-java的插件,生成gRPC的服务端和客户端文件

编译成功后,路径如下:

  1. homs-demo-client
  2. ├── build.gradle
  3. └── src
  4. └── main
  5. └── java
  6. └── com
  7. └── coder4
  8. └── homs
  9. └── demo
  10. ├── HomsDemo.proto
  11. ├── HomsDemoGrpc.java
  12. └── HomsDemoProto.java

如上所示:HomsDemoProto是protoc的桩文件,HomsDemoGrpc是gRPC服务的桩文件。

下面我们来编写客户端代码,HomsDemoClient.java:

  1. package com.coder4.homs.demo.client;
  2. import com.coder4.homs.demo.HomsDemoGrpc;
  3. import com.coder4.homs.demo.HomsDemoProto.AddRequest;
  4. import com.coder4.homs.demo.HomsDemoProto.AddResponse;
  5. import com.coder4.homs.demo.HomsDemoProto.AddSingleRequest;
  6. import io.grpc.Channel;
  7. import io.grpc.ManagedChannel;
  8. import io.grpc.ManagedChannelBuilder;
  9. import io.grpc.StatusRuntimeException;
  10. import io.grpc.stub.StreamObserver;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import java.util.Arrays;
  14. import java.util.Collection;
  15. import java.util.Optional;
  16. import java.util.concurrent.CountDownLatch;
  17. import java.util.concurrent.TimeUnit;
  18. import java.util.concurrent.atomic.AtomicLong;
  19. /**
  20. * @author coder4
  21. */
  22. public class HomsDemoClient {
  23. private Logger LOG = LoggerFactory.getLogger(HomsDemoClient.class);
  24. private final HomsDemoGrpc.HomsDemoBlockingStub blockingStub;
  25. private final HomsDemoGrpc.HomsDemoStub stub;
  26. /**
  27. * Construct client for accessing HelloWorld server using the existing channel.
  28. */
  29. public HomsDemoClient(Channel channel) {
  30. blockingStub = HomsDemoGrpc.newBlockingStub(channel);
  31. stub = HomsDemoGrpc.newStub(channel);
  32. }
  33. public Optional<Integer> add(int val1, int val2) {
  34. AddRequest request = AddRequest.newBuilder().setVal1(val1).setVal2(val2).build();
  35. AddResponse response;
  36. try {
  37. response = blockingStub.add(request);
  38. return Optional.ofNullable(response.getVal());
  39. } catch (StatusRuntimeException e) {
  40. LOG.error("RPC failed: {0}", e.getStatus());
  41. return Optional.empty();
  42. }
  43. }
  44. public Optional<Integer> add2(Collection<Integer> vals) {
  45. try {
  46. CountDownLatch cdl = new CountDownLatch(1);
  47. AtomicLong respVal = new AtomicLong();
  48. StreamObserver<AddSingleRequest> requestStreamObserver =
  49. stub.add2(new StreamObserver<AddResponse>() {
  50. @Override
  51. public void onNext(AddResponse value) {
  52. respVal.set(value.getVal());
  53. }
  54. @Override
  55. public void onError(Throwable t) {
  56. cdl.countDown();
  57. }
  58. @Override
  59. public void onCompleted() {
  60. cdl.countDown();
  61. }
  62. });
  63. for (int val : vals) {
  64. requestStreamObserver.onNext(AddSingleRequest.newBuilder().setVal(val).build());
  65. }
  66. requestStreamObserver.onCompleted();
  67. try {
  68. cdl.await(1, TimeUnit.SECONDS);
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. return Optional.ofNullable(respVal.intValue());
  73. } catch (StatusRuntimeException e) {
  74. LOG.error("RPC failed: {0}", e.getStatus());
  75. return Optional.empty();
  76. }
  77. }
  78. }

代码如上所示:Add还是相对简单的,但是使用了Stream的Add2就比较复杂了。

在上述代码中,需要传入Channel做为连接句柄,在假设知道IP和端口的情况下,可以如下构造:

  1. String target = "127.0.0.1:5000";
  2. ManagedChannel channel = null;
  3. try {
  4. channel = ManagedChannelBuilder
  5. .forTarget(target)
  6. .usePlaintext()
  7. .build();
  8. } catch (Exception e) {
  9. LOG.error("open channel excepiton", e);
  10. return;
  11. }
  12. HomsDemoClient client = new HomsDemoClient(channel);

在微服务架构下,实例众多,获取每个IP显得不太实际,我们会在后续章节介绍集成服务发现的Channel构造方案。

Server侧集成

老套路,首先是依赖集成:

  1. implementation 'com.google.protobuf:protobuf-java:3.17.3'
  2. implementation "io.grpc:grpc-stub:1.39.0"
  3. implementation "io.grpc:grpc-protobuf:1.39.0"
  4. implementation 'io.grpc:grpc-netty-shaded:1.39.0'

与上述客户端的集成完全一致。

接下来我们实现RPC的服务逻辑:

  1. /**
  2. * @(#)HomsDemoImpl.java, 8月 12, 2021.
  3. * <p>
  4. * Copyright 2021 coder4.com. All rights reserved.
  5. * CODER4.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.coder4.homs.demo.server.grpc;
  8. import com.coder4.homs.demo.HomsDemoGrpc.HomsDemoImplBase;
  9. import com.coder4.homs.demo.HomsDemoProto.AddRequest;
  10. import com.coder4.homs.demo.HomsDemoProto.AddResponse;
  11. import com.coder4.homs.demo.HomsDemoProto.AddSingleRequest;
  12. import io.grpc.stub.StreamObserver;
  13. /**
  14. * @author coder4
  15. */
  16. public final class HomsDemoGrpcImpl extends HomsDemoImplBase {
  17. @Override
  18. public void add(AddRequest request, StreamObserver<AddResponse> responseObserver) {
  19. responseObserver.onNext(AddResponse.newBuilder()
  20. .setVal(request.getVal1() + request.getVal2())
  21. .build());
  22. responseObserver.onCompleted();
  23. }
  24. @Override
  25. public StreamObserver<AddSingleRequest> add2(StreamObserver<AddResponse> responseObserver) {
  26. return new StreamObserver<AddSingleRequest>() {
  27. int sum = 0;
  28. @Override
  29. public void onNext(AddSingleRequest value) {
  30. sum += value.getVal();
  31. }
  32. @Override
  33. public void onError(Throwable t) {
  34. }
  35. @Override
  36. public void onCompleted() {
  37. responseObserver.onNext(AddResponse.newBuilder()
  38. .setVal(sum)
  39. .build());
  40. sum = 0;
  41. responseObserver.onCompleted();
  42. }
  43. };
  44. }
  45. }

这里要特别说明,因为gRPC都是异步回调的方式,所以其RPC在实现上有点反直觉:

  • 通过responseObserver.onNext返回调用结果

  • 通过responseObserver.onCompleted结束调用

而add2方法,由于采用了Client-Streaming,所以实现会更加复杂一些。

实际上,gRPC支持4种调用模式):

  • Unary: 客户端单输入,服务端单输出

  • Client-Streaming: 客户端多输入,服务端单输出

  • Server-Streaming: 客户端单输入,服务端多输出

  • Bidirectional-Streaming: 客户端多输入,服务端多输出

由于篇幅所限,本文种只实现了前2种,推荐你手动实现另外的两种模式。