基础功能对比

RPC IDL 通信 网络数据 压缩 Attachement 半同步 异步 Streaming
Thrift Binary Framed Thrift tcp 二进制 不支持 不支持 支持 不支持 不支持
Thrift Binary HttpTransport Thrift http 二进制 不支持 不支持 支持 不支持 不支持
GRPC PB http2 二进制 gzip/zlib/lz4/snappy 支持 不支持 支持 支持
BRPC Std PB tcp 二进制 gzip/zlib/lz4/snappy 支持 不支持 支持 支持
SogouRPC Std PB/Thrift tcp 二进制/JSON gzip/zlib/lz4/snappy 支持 支持 支持 不支持
SogouRPC Std Http PB/Thrift http 二进制/JSON gzip/zlib/lz4/snappy 支持 支持 支持 不支持

基础概念

  • 通信层:TCP/TPC_SSL/HTTP/HTTPS/HTTP2
  • 协议层:Thrift-binary/BRPC-std/SogouRPC-std
  • 压缩层:不压缩/gzip/zlib/lz4/snappy
  • 数据层:PB binary/Thrift binary/Json string
  • IDL序列化层:PB/Thrift serialization
  • RPC调用层:Service/Client IMPL

RPC Global

  • 获取srpc版本号srpc::SRPCGlobal::get_instance()->get_srpc_version()

RPC Status Code

name value 含义
RPCStatusUndefined 0 未定义
RPCStatusOK 1 正确/成功
RPCStatusServiceNotFound 2 找不到Service名
RPCStatusMethodNotFound 3 找不到RPC函数名
RPCStatusMetaError 4 Meta错误/解析失败
RPCStatusReqCompressSizeInvalid 5 请求压缩大小错误
RPCStatusReqDecompressSizeInvalid 6 请求解压大小错误
RPCStatusReqCompressNotSupported 7 请求压缩类型不支持
RPCStatusReqDecompressNotSupported 8 请求解压类型不支持
RPCStatusReqCompressError 9 请求压缩失败
RPCStatusReqDecompressError 10 请求解压失败
RPCStatusReqSerializeError 11 请求IDL序列化失败
RPCStatusReqDeserializeError 12 请求IDL反序列化失败
RPCStatusRespCompressSizeInvalid 13 回复压缩大小错误
RPCStatusRespDecompressSizeInvalid 14 回复解压大小错误
RPCStatusRespCompressNotSupported 15 回复压缩类型不支持
RPCStatusRespDecompressNotSupported 16 回复解压类型不支持
RPCStatusRespCompressError 17 回复压缩失败
RPCStatusRespDecompressError 18 回复解压失败
RPCStatusRespSerializeError 19 回复IDL序列化失败
RPCStatusRespDeserializeError 20 回复IDL反序列化失败
RPCStatusIDLSerializeNotSupported 21 不支持IDL序列化
RPCStatusIDLDeserializeNotSupported 22 不支持IDL反序列化
RPCStatusURIInvalid 30 URI非法
RPCStatusUpstreamFailed 31 Upstream全熔断
RPCStatusSystemError 100 系统错误
RPCStatusSSLError 101 SSL错误
RPCStatusDNSError 102 DNS错误
RPCStatusProcessTerminated 103 程序退出&终止

RPC IDL

  • 描述文件
  • 前后兼容
  • Protobuf/Thrift

示例

下面我们通过一个具体例子来呈现

  • 我们拿pb举例,定义一个ServiceName为Exampleexample.proto文件
  • rpc接口名为Echo,输入参数为EchoRequest,输出参数为EchoResponse
  • EchoRequest包括两个string:messagename
  • EchoResponse包括一个string:message
  1. syntax="proto2";
  2. message EchoRequest {
  3. optional string message = 1;
  4. optional string name = 2;
  5. };
  6. message EchoResponse {
  7. optional string message = 1;
  8. };
  9. service Example {
  10. rpc Echo(EchoRequest) returns (EchoResponse);
  11. };

RPC Service

  • 组成sogouRPC服务的基本单元
  • 每一个Service一定由某一种IDL生成
  • Service只与IDL有关,与网络通信具体协议无关

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的example.protoIDL描述文件
  • 执行官方的protoc example.proto --cpp_out=./ --proto_path=./获得example.pb.hexample.pb.cpp两个文件
  • 执行SogouRPC的srpc_generator protobuf ./example.proto ./获得example.srpc.h文件
  • 我们派生Example::Service来实现具体的rpc业务逻辑,这就是一个RPC Service
  • 注意这个Service没有任何网络、端口、通信协议等概念,仅仅负责完成实现从EchoRequest输入到输出EchoResponse的业务逻辑
  1. class ExampleServiceImpl : public Example::Service
  2. {
  3. public:
  4. void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
  5. {
  6. response->set_message("Hi, " + request->name());
  7. printf("get_req:\n%s\nset_resp:\n%s\n",
  8. request->DebugString().c_str(),
  9. response->DebugString().c_str());
  10. }
  11. };

RPC Server

  • 每一个Server对应一个端口
  • 每一个Server对应一个确定的网络通信协议
  • 每一个Service可以添加到任意的Server里
  • 每一个Server可以拥有任意的Service,但在当前Server里ServiceName必须唯一
  • 不同IDL的Service是可以放进同一个Server中的

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的ExampleServiceImplService
  • 首先,我们创建1个RPC Server、需要确定协议
  • 然后,我们可以创建任意个数的Service实例、任意不同proto形成的Service,把这些Service通过add_service()接口添加到Server里
  • 最后,通过Server的start或者serve开启服务,处理即将到来的rpc请求
  • 想像一下,我们也可以从Example::Service派生更多的功能的rpcEcho不同实现的Service
  • 想像一下,我们可以在N个不同的端口创建N个不同的RPC Server、代表着不同的协议
  • 想像一下,我们可以把同一个ServiceIMPL实例add_service到不同的Server上,我们也可以把不同的ServiceIMPL实例add_service到同一个Server上
  • 想像一下,我们可以用同一个ExampleServiceImpl,在三个不同端口、同时服务于BPRC-STD、SogouRPC-STD、SogouRPC-Http
  • 甚至,我们可以将1个PB的ExampleServiceImpl和1个Thrift的AnotherThriftServiceImpladd_service到同一个SogouRPC-STD Server,两种IDL在同一个端口上完美工作!
  1. int main()
  2. {
  3. SRPCServer server_srpc;
  4. SRPCHttpServer server_srpc_http;
  5. BRPCServer server_brpc;
  6. ThriftServer server_thrift;
  7. ExampleServiceImpl impl_pb;
  8. AnotherThriftServiceImpl impl_thrift;
  9. server_srpc.add_service(&impl_pb);
  10. server_srpc.add_service(&impl_thrift);
  11. server_srpc_http.add_service(&impl_pb);
  12. server_srpc_http.add_service(&impl_thrift);
  13. server_brpc.add_service(&impl_pb);
  14. server_thrift.add_service(&impl_thrift);
  15. server_srpc.start(1412);
  16. server_srpc_http.start(8811);
  17. server_brpc.start(2020);
  18. server_thrift.start(9090);
  19. pause();
  20. server_thrift.stop();
  21. server_brpc.stop();
  22. server_srpc_http.stop();
  23. server_srpc.stop();
  24. return 0;
  25. }

RPC Client

  • 每一个Client对应着一个确定的目标/一个确定的集群
  • 每一个Client对应着一个确定的网络通信协议
  • 每一个Client对应着一个确定的IDL

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的例子,client相对简单,直接调用即可
  • 通过Example::XXXClient创建某种RPC的client实例,需要目标的ip+port或url
  • 利用client实例直接调用rpc函数Echo即可,这是一次异步请求,请求完成后会进入回调函数
  • 具体的RPC Context用法请看下一个段落
  1. #include <stdio.h>
  2. #include "example.srpc.h"
  3. using namespace srpc;
  4. int main()
  5. {
  6. Example::SRPCClient client("127.0.0.1", 1412);
  7. EchoRequest req;
  8. req.set_message("Hello, sogou rpc!");
  9. req.set_name("Li Yingxin");
  10. client.Echo(&req, [](EchoResponse *response, RPCContext *ctx) {
  11. if (ctx->success())
  12. printf("%s\n", response->DebugString().c_str());
  13. else
  14. printf("status[%d] error[%d] errmsg:%s\n",
  15. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  16. });
  17. pause();
  18. return 0;
  19. }

RPC Context

  • RPCContext专门用来辅助异步接口,Service和Client通用
  • 每一个异步接口都会提供Context,用来给用户提供更高级的功能,比如获取对方ip、获取连接seqid等
  • Context上一些功能是Server或Client独有的,比如Server可以设置回复数据的压缩方式,Client可以获取请求成功或失败
  • Context上可以通过get_series获得所在的series,与workflow的异步模式无缝结合

RPCContext API - Common

long long get_seqid() const;

请求+回复视为1次完整通信,获得当前socket连接上的通信sequence id,seqid=0代表第1次

std::string get_remote_ip() const;

获得对方IP地址,支持ipv4/ipv6

int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;

获得对方地址,in/out参数为更底层的数据结构sockaddr

const std::string& get_service_name() const;

获取RPC Service Name

const std::string& get_method_name() const;

获取RPC Methode Name

SeriesWork *get_series() const;

获取当前ServerTask/ClientTask所在series

RPCContext API - Only for client done

bool success() const;

client专用。这次请求是否成功

int get_status_code() const;

client专用。这次请求的rpc status code

const char *get_errmsg() const;

client专用。这次请求的错误信息

int get_error() const;

client专用。这次请求的错误码

void *get_user_data() const;

client专用。获取ClientTask的user_data。如果用户通过create_xxx_task接口产生task,则可以通过user_data域记录上下文,在创建task时设置,在回调函数中拿回。

RPCContext API - Only for server process

void set_data_type(RPCDataType type);

Server专用。设置数据打包类型

  • RPCDataProtobuf
  • RPCDataThrift
  • RPCDataJson

void set_compress_type(RPCCompressType type);

Server专用。设置数据压缩类型(注:Client的压缩类型在Client或Task上设置)

  • RPCCompressNone
  • RPCCompressSnappy
  • RPCCompressGzip
  • RPCCompressZlib
  • RPCCompressLz4

void set_attachment_nocopy(const char *attachment, size_t len);

Server专用。设置attachment附件。

bool get_attachment(const char **attachment, size_t *len) const;

Server专用。获取attachment附件。

void set_reply_callback(std::function<void (RPCContext *ctx)> cb);

Server专用。设置reply callback,操作系统写入socket缓冲区成功后被调用。

void set_send_timeout(int timeout);

Server专用。设置发送超时,单位毫秒。-1代表无限。

void set_keep_alive(int timeout);

Server专用。设置连接保活时间,单位毫秒。-1代表无限。

RPC Options

Server Params

name 默认 含义
max_connections 2000 Server的最大连接数,默认2000个
peer_response_timeout 10 * 1000 每一次IO的读超时,默认10秒
receive_timeout -1 每一条完整消息的读超时,默认无限
keep_alive_timeout 60 * 1000 空闲连接保活,-1代表永远不断开,0代表短连接,默认长连接保活60秒
request_size_limit 2LL 1024 1024 * 1024 请求包大小限制,最大2GB
ssl_accept_timeout 10 * 1000 SSL连接超时,默认10秒

Client Params

name 默认 含义
host “” 目标host,可以是ip、域名
port 1412 目标端口号,默认1412
is_ssl false ssl开关,默认关闭
url “” 当host为空,url设置才有效。url将屏蔽host/port/is_ssl三项
task_params TASK默认配置 见下方

Task Params

name 默认 含义
send_timeout NaN 发送写超时,默认无限
keep_alive_timeout NaN 空闲连接保活,-1代表永远不断开,默认0短连接
retry_max NaN 最大重试次数,默认0不重试
compress_type NaN 压缩类型,默认不压缩
data_type NaN 网络包数据类型,默认与RPC默认值一致,SRPC-Http协议为json,其余为对应IDL的类型

与workflow异步框架的结合

Server

下面我们通过一个具体例子来呈现

  • Echo RPC在接收到请求时,向下游发起一次http请求
  • 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
  • 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
  • 首先,我们通过Workflow框架的工厂WFTaskFactory::create_http_task创建一个异步任务http_task
  • 然后,我们利用RPCContext的ctx->get_series()获取到ServerTask所在的SeriesWork
  • 最后,我们使用SeriesWork的push_back接口将http_task放到SeriesWork的后面
  1. class ExampleServiceImpl : public Example::Service
  2. {
  3. public:
  4. void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
  5. {
  6. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
  7. [request, response](WFHttpTask *task) {
  8. if (task->get_state() == WFT_STATE_SUCCESS)
  9. {
  10. const void *data;
  11. size_t len;
  12. task->get_resp()->get_parsed_body(&data, &len);
  13. response->mutable_message()->assign((const char *)data, len);
  14. }
  15. else
  16. response->set_message("Error: " + std::to_string(task->get_error()));
  17. printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
  18. request->DebugString().c_str(),
  19. response->DebugString().c_str());
  20. });
  21. ctx->get_series()->push_back(http_task);
  22. }
  23. };

Client

下面我们通过一个具体例子来呈现

  • 我们并行发出两个请求,1个是rpc请求,1个是http请求
  • 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
  • 首先,我们通过RPC Client的create_Echo_task创建一个rpc异步请求的网络任务rpc_task
  • 然后,我们通过Workflow框架的工厂WFTaskFactory::create_http_taskWFTaskFactory::create_go_task分别创建异步网络任务http_task,和异步计算任务calc_task
  • 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行start
  1. void calc(int x, int y)
  2. {
  3. int z = x * x + y * y;
  4. printf("calc result: %d\n", z);
  5. }
  6. int main()
  7. {
  8. Example::SRPCClient client("127.0.0.1", 1412);
  9. auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
  10. if (ctx->success())
  11. printf("%s\n", response->DebugString().c_str());
  12. else
  13. printf("status[%d] error[%d] errmsg:%s\n",
  14. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  15. });
  16. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
  17. if (task->get_state() == WFT_STATE_SUCCESS)
  18. {
  19. std::string body;
  20. const void *data;
  21. size_t len;
  22. task->get_resp()->get_parsed_body(&data, &len);
  23. body.assign((const char *)data, len);
  24. printf("%s\n\n", body.c_str());
  25. }
  26. else
  27. printf("Http request fail\n\n");
  28. });
  29. auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
  30. EchoRequest req;
  31. req.set_message("Hello, sogou rpc!");
  32. req.set_name("1412");
  33. rpc_task->serialize_input(&req);
  34. ((*http_task * rpc_task) > calc_task).start();
  35. pause();
  36. return 0;
  37. }