使用说明
Triple 协议是 Dubbo3 的主力协议,完整兼容 gRPC over HTTP/2,并在协议层面扩展了负载均衡和流量控制相关机制。本文档旨在指导用户正确的使用 Triple 协议。
在开始前,需要决定服务使用的序列化方式,如果为新服务,推荐使用 protobuf 作为默认序列化,在性能和跨语言上的效果都会更好。如果是原有服务想进行协议升级,Triple 协议也已经支持其他序列化方式,如 Hessian / JSON 等
Protobuf
编写 IDL 文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.dubbo.hello";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
添加编译 protobuf 的 extension 和 plugin (以 maven 为例)
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>triple-java</pluginId>
<outputDirectory>build/generated/source/proto/main/java</outputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
构建/ 编译生成 protobuf Message 类
$ mvn clean install
Unary 方式
编写 Java 接口
import org.apache.dubbo.hello.HelloReply;
import org.apache.dubbo.hello.HelloRequest;
public interface IGreeter {
/**
* <pre>
* Sends a greeting
* </pre>
*/
HelloReply sayHello(HelloRequest request);
}
创建 Provider
public static void main(String[] args) throws InterruptedException {
ServiceConfig<IGreeter> service = new ServiceConfig<>();
service.setInterface(IGreeter.class);
service.setRef(new IGreeter1Impl());
// 这里需要显示声明使用的协议为triple
service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
service.setApplication(new ApplicationConfig("demo-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
创建 Consumer
public static void main(String[] args) throws IOException {
ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
ref.setInterface(IGreeter.class);
ref.setCheck(false);
ref.setProtocol(CommonConstants.TRIPLE);
ref.setLazy(true);
ref.setTimeout(100000);
ref.setApplication(new ApplicationConfig("demo-consumer"));
ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
final IGreeter iGreeter = ref.get();
System.out.println("dubbo ref started");
try {
final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
.setName("name")
.build());
TimeUnit.SECONDS.sleep(1);
System.out.println("Reply:" + reply);
} catch (Throwable t) {
t.printStackTrace();
}
System.in.read();
}
运行 Provider 和 Consumer ,可以看到请求正常返回了
Reply:message: “name”
stream 方式
编写 Java 接口
import org.apache.dubbo.hello.HelloReply;
import org.apache.dubbo.hello.HelloRequest;
public interface IGreeter {
/**
* <pre>
* Sends greeting by stream
* </pre>
*/
StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
}
编写实现类
public class IStreamGreeterImpl implements IStreamGreeter {
@Override
public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
return new StreamObserver<HelloRequest>() {
private List<HelloReply> replyList = new ArrayList<>();
@Override
public void onNext(HelloRequest helloRequest) {
System.out.println("onNext receive request name:" + helloRequest.getName());
replyList.add(HelloReply.newBuilder()
.setMessage("receive name:" + helloRequest.getName())
.build());
}
@Override
public void onError(Throwable cause) {
System.out.println("onError");
replyObserver.onError(cause);
}
@Override
public void onCompleted() {
System.out.println("onComplete receive request size:" + replyList.size());
for (HelloReply reply : replyList) {
replyObserver.onNext(reply);
}
replyObserver.onCompleted();
}
};
}
}
创建 Provider
public class StreamProvider {
public static void main(String[] args) throws InterruptedException {
ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
service.setInterface(IStreamGreeter.class);
service.setRef(new IStreamGreeterImpl());
service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
service.setApplication(new ApplicationConfig("stream-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
创建 Consumer
public class StreamConsumer {
public static void main(String[] args) throws InterruptedException, IOException {
ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
ref.setInterface(IStreamGreeter.class);
ref.setCheck(false);
ref.setProtocol(CommonConstants.TRIPLE);
ref.setLazy(true);
ref.setTimeout(100000);
ref.setApplication(new ApplicationConfig("stream-consumer"));
ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
final IStreamGreeter iStreamGreeter = ref.get();
System.out.println("dubbo ref started");
try {
StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
System.out.println("onNext");
System.out.println(reply.getMessage());
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
streamObserver.onNext(HelloRequest.newBuilder()
.setName("tony")
.build());
streamObserver.onNext(HelloRequest.newBuilder()
.setName("nick")
.build());
streamObserver.onCompleted();
} catch (Throwable t) {
t.printStackTrace();
}
System.in.read();
}
}
运行 Provider 和 Consumer ,可以看到请求正常返回了
onNext
receive name:tony
onNext
receive name:nick
onCompleted
其他序列化方式
省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的协议即可完成协议升级。
示例程序
本文的示例程序可以在 triple-samples 找到
最后修改 December 16, 2022: Fix check (#1736) (97972c1)