TCP协议文档

1. 协议格式

消息组成详解:

  1. 魔术字:9位,当前值为“EventMesh
  2. 通信协议版本号:4位,当前值为“0000
  3. 消息总长度值(length):4位,int类型
  4. 消息头长度值(headerLength):4位,int类型
  5. 消息头(header):长度 = headerLength
  6. 消息体(body):长度 = length - headerLength - 4 - 4

2. 业务逻辑层

  • 消息组成

消息头(header)+ 消息体(body)

  1. public class Package {
  2. private Header header;
  3. private Object body;
  4. }
  5. public class Header {
  6. private Command cmd;
  7. private int code;
  8. private String msg;
  9. private String seq;
  10. }
  • 详解

消息头(header):类型为Header,Header中有Command字段,用于区分不同的消息类型

消息体(body):对于不同的消息类型,body的类型不同

消息命令字body类型
HEARTBEAT_REQUEST, HEARTBEAT_RESPONSE, HELLO_RESPONSE, CLIENT_GOODBYE_REQUEST, CLIENT_GOODBYE_RESPONSE, SERVER_GOODBYE_REQUEST, SERVER_GOODBYE_RESPONSE, LISTEN_REQUEST, LISTEN_RESPONSE, UNSUBSCRIBE_REQUEST, SUBSCRIBE_RESPONSE, UNSUBSCRIBE_RESPONSE, ASYNC_MESSAGE_TO_SERVER_ACK, BROADCAST_MESSAGE_TO_SERVER_ACK
HELLO_REQUESTUserAgent
SUBSCRIBE_REQUESTSubscription
REQUEST_TO_SERVER, REQUEST_TO_CLIENT, RESPONSE_TO_SERVER, RESPONSE_TO_CLIENT, ASYNC_MESSAGE_TO_SERVER, ASYNC_MESSAGE_TO_CLIENT, BROADCAST_MESSAGE_TO_SERVER, BROADCAST_MESSAGE_TO_CLIENT, ASYNC_MESSAGE_TO_CLIENT_ACK, BROADCAST_MESSAGE_TO_CLIENT_ACK, RESPONSE_TO_CLIENT_ACK, REQUEST_TO_CLIENT_ACKOpenMessage
REDIRECT_TO_CLIENTRedirectInfo

3. Client 与 Eventmesh-Runtime(Server)交互场景详解

  1. public enum Command {
  2. //心跳
  3. HEARTBEAT_REQUEST(0), //client发给server的心跳包
  4. HEARTBEAT_RESPONSE(1), //server回复client的心跳包
  5. //握手
  6. HELLO_REQUEST(2), //client发给server的握手请求
  7. HELLO_RESPONSE(3), //server回复client的握手请求
  8. //断连
  9. CLIENT_GOODBYE_REQUEST(4), //client主动断连时通知server
  10. CLIENT_GOODBYE_RESPONSE(5), //server回复client的主动断连通知
  11. SERVER_GOODBYE_REQUEST(6), //server主动断连时通知client
  12. SERVER_GOODBYE_RESPONSE(7), //client回复server的主动断连通知
  13. //订阅管理
  14. SUBSCRIBE_REQUEST(8), //client发给server的订阅请求
  15. SUBSCRIBE_RESPONSE(9), //server回复client的订阅请求
  16. UNSUBSCRIBE_REQUEST(10), //client发给server的取消订阅请求
  17. UNSUBSCRIBE_RESPONSE(11), //server回复client的取消订阅请求
  18. //监听
  19. LISTEN_REQUEST(12), //client发给server的启动监听请求
  20. LISTEN_RESPONSE(13), //server回复client的监听请求
  21. //RR
  22. REQUEST_TO_SERVER(14), //client将RR请求发送给server
  23. REQUEST_TO_CLIENT(15), //server将RR请求推送给client
  24. REQUEST_TO_CLIENT_ACK(16), //client收到RR请求后ACK给server
  25. RESPONSE_TO_SERVER(17), //client将RR回包发送给server
  26. RESPONSE_TO_CLIENT(18), //server将RR回包推送给client
  27. RESPONSE_TO_CLIENT_ACK(19), //client收到回包后ACK给server
  28. //异步事件
  29. ASYNC_MESSAGE_TO_SERVER(20), //client将异步事件发送给server
  30. ASYNC_MESSAGE_TO_SERVER_ACK(21), //server收到异步事件后ACK给client
  31. ASYNC_MESSAGE_TO_CLIENT(22), //server将异步事件推送给client
  32. ASYNC_MESSAGE_TO_CLIENT_ACK(23), //client收到异步事件后ACK给server
  33. //广播
  34. BROADCAST_MESSAGE_TO_SERVER(24), //client将广播消息发送给server
  35. BROADCAST_MESSAGE_TO_SERVER_ACK(25), //server收到广播消息后ACK给client
  36. BROADCAST_MESSAGE_TO_CLIENT(26), //server将广播消息推送给client
  37. BROADCAST_MESSAGE_TO_CLIENT_ACK(27), //client收到广播消息后ACK给server
  38. //重定向指令
  39. REDIRECT_TO_CLIENT(30), //server将重定向指令推动给client
  40. }

4. Client发起交互

场景Client向Server发送消息命令字Server回复Client消息的命令字说明
握手HELLO_REQUESTHELLO_RESPONSE
心跳HEARTBEAT_REQUESTHEARTBEAT_RESPONSE
订阅SUBSCRIBE_REQUESTSUBSCRIBE_RESPONSE
取消订阅UNSUBSCRIBE_REQUESTUNSUBSCRIBE_RESPONSE
开始监听消息LISTEN_REQUESTLISTEN_RESPONSE
发送RR请求REQUEST_TO_SERVERRESPONSE_TO_CLIENT
发送RR回包RESPONSE_TO_SERVER
发送异步事件ASYNC_MESSAGE_TO_SERVERASYNC_MESSAGE_TO_SERVER_ACK
发送广播事件BROADCAST_MESSAGE_TO_SERVERBROADCAST_MESSAGE_TO_SERVER_ACK
客户端主动断连CLIENT_GOODBYE_REQUESTCLIENT_GOODBYE_RESPONSE

5. Server发起交互

场景Server向Client发送消息命令字Client回复Server消息命令字说明
客户端接收RR请求REQUEST_TO_CLIENTREQUEST_TO_CLIENT_ACK
客户端接收RR回包RESPONSE_TO_CLIENTRESPONSE_TO_CLIENT_ACK
客户端接收异步事件ASYNC_MESSAGE_TO_CLIENTASYNC_MESSAGE_TO_CLIENT_ACK
客户端接收广播事件BROADCAST_MESSAGE_TO_CLIENTBROADCAST_MESSAGE_TO_CLIENT_ACK
服务端主动断连SERVER_GOODBYE_REQUEST
服务端进行重定向REDIRECT_TO_CLIENT

6. 消息类型

  • 发送RR消息

rr-msg

  • 发送异步单播消息

async-msg

  • 发送广播消息

broadcast-msg

HTTP协议文档

Java类LiteMessagecontent字段表示一个特殊的协议,因此,如果您要使用eventmesh-sdk-java的http-client,则只需设计协议的content即可。LiteMessage组成如下:

  1. public class LiteMessage {
  2. private String bizSeqNo;
  3. private String uniqueId;
  4. private String topic;
  5. private String content;
  6. private Map<String, String> prop;
  7. private long createTime = System.currentTimeMillis();
  8. }

1. 消息发送方式与组成

消息发送方式:POST方式

消息组成:请求头(RequestHeader) + 请求体(RequestBody)

  • 心跳消息

RequestHeader

Key说明
Envclient所属环境
Regionclient所属区域
Idcclient所属IDC
Dcnclient所在DCN
Sysclient所属子系统
Pidclient进程号
Ipclient Ip
Usernameclient 用户名
Passwdclient 密码
Version协议版本
Language语言描述
Code请求码

RequestBody

Key说明
clientType客户端类型
heartbeatEntities心跳实体,包含topic、url等信息
  • 订阅消息:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端订阅的topic
urltopic对应的url
  • 取消订阅消息:

RequestHeader

与心跳消息一致

RequestBody

与订阅消息一致

  • 发送异步事件:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端请求的topic
content客户端发送的topic的内容
ttl客户端请求超时时间
bizSeqNo客户端请求业务流水号
uniqueId客户端请求消息唯一标识

2. Client发起交互

场景Client向Server发送消息请求码Server回复Client消息的响应码说明
心跳HEARTBEAT(203)SUCCESS(0)/EVENTMESH_HEARTBEAT_ERROR(19)
订阅SUBSCRIBE(206)SUCCESS(0)/EVENTMESH_SUBSCRIBE_ERROR(17)
取消订阅UNSUBSCRIBE(207)SUCCESS(0)/EVENTMESH_UNSUBSCRIBE_ERROR(18)
发送异步事件MSG_SEND_ASYNC(104)SUCCESS(0)/EVENTMESH_SEND_ASYNC_MSG_ERR(14)

3. Server发起交互

场景Server向Client发送消息请求码Client回复Server消息响应码说明
客户端接收异步事件HTTP_PUSH_CLIENT_ASYNC(105)retCoderetCode值为0时代表成功

gRPC 协议文档

1. protobuf

eventmesh-protocol-gprc 模块有 Eventmesh gRPC 客户端的 protobuf 文件. the protobuf 文件路径是 /src/main/proto/eventmesh-client.proto.

用gradle build 生成 gRPC 代码在 /build/generated/source/proto/main. 生成代码用于 eventmesh-sdk-java 模块.

2. gRPC 数据模型

  • 消息

以下消息数据模型用于 publish(), requestReply()broadcast() APIs.

  1. message RequestHeader {
  2. string env = 1;
  3. string region = 2;
  4. string idc = 3;
  5. string ip = 4;
  6. string pid = 5;
  7. string sys = 6;
  8. string username = 7;
  9. string password = 8;
  10. string language = 9;
  11. string protocolType = 10;
  12. string protocolVersion = 11;
  13. string protocolDesc = 12;
  14. }
  15. message SimpleMessage {
  16. RequestHeader header = 1;
  17. string producerGroup = 2;
  18. string topic = 3;
  19. string content = 4;
  20. string ttl = 5;
  21. string uniqueId = 6;
  22. string seqNum = 7;
  23. string tag = 8;
  24. map<string, string> properties = 9;
  25. }
  26. message BatchMessage {
  27. RequestHeader header = 1;
  28. string producerGroup = 2;
  29. string topic = 3;
  30. message MessageItem {
  31. string content = 1;
  32. string ttl = 2;
  33. string uniqueId = 3;
  34. string seqNum = 4;
  35. string tag = 5;
  36. map<string, string> properties = 6;
  37. }
  38. repeated MessageItem messageItem = 4;
  39. }
  40. message Response {
  41. string respCode = 1;
  42. string respMsg = 2;
  43. string respTime = 3;
  44. }
  • 订阅

以下订阅数据模型用于 subscribe()unsubscribe() APIs.

  1. message Subscription {
  2. RequestHeader header = 1;
  3. string consumerGroup = 2;
  4. message SubscriptionItem {
  5. enum SubscriptionMode {
  6. CLUSTERING = 0;
  7. BROADCASTING = 1;
  8. }
  9. enum SubscriptionType {
  10. ASYNC = 0;
  11. SYNC = 1;
  12. }
  13. string topic = 1;
  14. SubscriptionMode mode = 2;
  15. SubscriptionType type = 3;
  16. }
  17. repeated SubscriptionItem subscriptionItems = 3;
  18. string url = 4;
  19. }
  • 心跳

以下心跳数据模型用于 heartbeat() API.

  1. message Heartbeat {
  2. enum ClientType {
  3. PUB = 0;
  4. SUB = 1;
  5. }
  6. RequestHeader header = 1;
  7. ClientType clientType = 2;
  8. string producerGroup = 3;
  9. string consumerGroup = 4;
  10. message HeartbeatItem {
  11. string topic = 1;
  12. string url = 2;
  13. }
  14. repeated HeartbeatItem heartbeatItems = 5;
  15. }

3. gRPC 服务接口

  • 事件生产端服务 APIs
  1. service PublisherService {
  2. # 异步事件生产
  3. rpc publish(SimpleMessage) returns (Response);
  4. # 同步事件生产
  5. rpc requestReply(SimpleMessage) returns (Response);
  6. # 批量事件生产
  7. rpc batchPublish(BatchMessage) returns (Response);
  8. }
  • 事件消费端服务 APIs
  1. service ConsumerService {
  2. # 所消费事件通过 HTTP Webhook推送事件
  3. rpc subscribe(Subscription) returns (Response);
  4. # 所消费事件通过 TCP stream推送事件
  5. rpc subscribeStream(Subscription) returns (stream SimpleMessage);
  6. rpc unsubscribe(Subscription) returns (Response);
  7. }
  • 客户端心跳服务 API
  1. service HeartbeatService {
  2. rpc heartbeat(Heartbeat) returns (Response);
  3. }