TCP 协议文档

1. 协议格式

消息组成详解:

  1. 魔术字:9 位,当前值为“EventMesh
  2. 通信协议版本号:4 位,当前值为“0000
  3. 消息总长度值 (length):4 位,int 类型
  4. 消息头长度值 (headerLength):4 位,int 类型
  5. 消息头 (header):长度 = headerLength
  6. 消息体 (body):长度 = length - headerLength - 4 - 4

2. 业务逻辑层

  • 消息组成

消息头(header)+ 消息体(body)

  1. public class Package {
  2. private Header header;
  3. private Object body;
  4. }
  5. public class Header {
  6. private Command cmd;
  7. private int code;
  8. private String msg;
  9. private String seq;
  10. }
  • 详解

消息头 (header):类型为 Header,Header 中有 Command 字段,用于区分不同的消息类型

消息体 (body):对于不同的消息类型,body 的类型不同

消息命令字body 类型
HEARTBEAT_REQUEST, HEARTBEAT_RESPONSE, HELLO_RESPONSE, CLIENT_GOODBYE_REQUEST, CLIENT_GOODBYE_RESPONSE, SERVER_GOODBYE_REQUEST, SERVER_GOODBYE_RESPONSE, LISTEN_REQUEST, LISTEN_RESPONSE, UNSUBSCRIBE_REQUEST, SUBSCRIBE_RESPONSE, UNSUBSCRIBE_RESPONSE, ASYNC_MESSAGE_TO_SERVER_ACK, BROADCAST_MESSAGE_TO_SERVER_ACK
HELLO_REQUESTUserAgent
SUBSCRIBE_REQUESTSubscription
REQUEST_TO_SERVER, REQUEST_TO_CLIENT, RESPONSE_TO_SERVER, RESPONSE_TO_CLIENT, ASYNC_MESSAGE_TO_SERVER, ASYNC_MESSAGE_TO_CLIENT, BROADCAST_MESSAGE_TO_SERVER, BROADCAST_MESSAGE_TO_CLIENT, ASYNC_MESSAGE_TO_CLIENT_ACK, BROADCAST_MESSAGE_TO_CLIENT_ACK, RESPONSE_TO_CLIENT_ACK, REQUEST_TO_CLIENT_ACKOpenMessage
REDIRECT_TO_CLIENTRedirectInfo

3. Client 与 EventMesh Runtime(Server) 交互场景详解

  1. public enum Command {
  2. //心跳
  3. HEARTBEAT_REQUEST(0), //client 发给 server 的心跳包
  4. HEARTBEAT_RESPONSE(1), //server 回复 client 的心跳包
  5. //握手
  6. HELLO_REQUEST(2), //client 发给 server 的握手请求
  7. HELLO_RESPONSE(3), //server 回复 client 的握手请求
  8. //断连
  9. CLIENT_GOODBYE_REQUEST(4), //client 主动断连时通知 server
  10. CLIENT_GOODBYE_RESPONSE(5), //server 回复 client 的主动断连通知
  11. SERVER_GOODBYE_REQUEST(6), //server 主动断连时通知 client
  12. SERVER_GOODBYE_RESPONSE(7), //client 回复 server 的主动断连通知
  13. //订阅管理
  14. SUBSCRIBE_REQUEST(8), //client 发给 server 的订阅请求
  15. SUBSCRIBE_RESPONSE(9), //server 回复 client 的订阅请求
  16. UNSUBSCRIBE_REQUEST(10), //client 发给 server 的取消订阅请求
  17. UNSUBSCRIBE_RESPONSE(11), //server 回复 client 的取消订阅请求
  18. //监听
  19. LISTEN_REQUEST(12), //client 发给 server 的启动监听请求
  20. LISTEN_RESPONSE(13), //server 回复 client 的监听请求
  21. //RR
  22. REQUEST_TO_SERVER(14), //client 将 RR 请求发送给 server
  23. REQUEST_TO_CLIENT(15), //server 将 RR 请求推送给 client
  24. REQUEST_TO_CLIENT_ACK(16), //client 收到 RR 请求后 ACK 给 server
  25. RESPONSE_TO_SERVER(17), //client 将 RR 回包发送给 server
  26. RESPONSE_TO_CLIENT(18), //server 将 RR 回包推送给 client
  27. RESPONSE_TO_CLIENT_ACK(19), //client 收到回包后 ACK 给 server
  28. //异步事件
  29. ASYNC_MESSAGE_TO_SERVER(20), //client 将异步事件发送给 server
  30. ASYNC_MESSAGE_TO_SERVER_ACK(21), //server 收到异步事件后 ACK 给 client
  31. ASYNC_MESSAGE_TO_CLIENT(22), //server 将异步事件推送给 client
  32. ASYNC_MESSAGE_TO_CLIENT_ACK(23), //client 收到异步事件后 ACK 给 server
  33. //广播
  34. BROADCAST_MESSAGE_TO_SERVER(24), //client 将广播消息发送给 server
  35. BROADCAST_MESSAGE_TO_SERVER_ACK(25), //server 收到广播消息后 ACK 给 client
  36. BROADCAST_MESSAGE_TO_CLIENT(26), //server 将广播消息推送给 client
  37. BROADCAST_MESSAGE_TO_CLIENT_ACK(27), //client 收到广播消息后 ACK 给 server
  38. //重定向指令
  39. REDIRECT_TO_CLIENT(30), //server 将重定向指令推动给 client
  40. }

4. Client 发起交互

场景Client 向 Server 发送消息命令字Server 回复 Client 消息的命令字说明
握手HELLO_REQUESTHELLO_RESPONSE
心跳HEARTBEAT_REQUESTHEARTBEAT_RESPONSE
订阅SUBSCRIBE_REQUESTSUBSCRIBE_RESPONSE
取消订阅UNSUBSCRIBE_REQUESTUNSUBSCRIBE_RESPONSE
开始监听消息LISTEN_REQUESTLISTEN_RESPONSE
发送 RR 请求REQUEST_TO_SERVERRESPONSE_TO_CLIENT
发送 RR 回包RESPONSE_TO_SERVER
发送异步事件ASYNC_MESSAGE_TO_SERVERASYNC_MESSAGE_TO_SERVER_ACK
发送广播事件BROADCAST_MESSAGE_TO_SERVERBROADCAST_MESSAGE_TO_SERVER_ACK
客户端主动断连CLIENT_GOODBYE_REQUESTCLIENT_GOODBYE_RESPONSE

5. Server 发起交互

场景Server 向 Client 发送消息命令字Client 回复 Server 消息命令字说明
客户端接收 RR 请求REQUEST_TO_CLIENTREQUEST_TO_CLIENT_ACK
客户端接收 RR 回包RESPONSE_TO_CLIENTRESPONSE_TO_CLIENT_ACK
客户端接收异步事件ASYNC_MESSAGE_TO_CLIENTASYNC_MESSAGE_TO_CLIENT_ACK
客户端接收广播事件BROADCAST_MESSAGE_TO_CLIENTBROADCAST_MESSAGE_TO_CLIENT_ACK
服务端主动断连SERVER_GOODBYE_REQUEST
服务端进行重定向REDIRECT_TO_CLIENT

6. 消息类型

  • 发送 RR 消息

rr-msg

  • 发送异步单播消息

async-msg

  • 发送广播消息

broadcast-msg

HTTP 协议文档

Java 类EventMeshMessagecontent字段表示一个特殊的协议,因此,如果您要使用 eventmesh-sdk-java 的 http-client,则只需设计协议的content即可。EventMeshMessage组成如下:

  1. public class EventMeshMessage {
  2. private String bizSeqNo;
  3. private String uniqueId;
  4. private String topic;
  5. private String content;
  6. private Map<String, String> prop;
  7. private long createTime = System.currentTimeMillis();
  8. }

1. 消息发送方式与组成

消息发送方式:POST 方式

消息组成:请求头 (RequestHeader) + 请求体 (RequestBody)

  • 心跳消息

RequestHeader

Key说明
Envclient 所属环境
Regionclient 所属区域
Idcclient 所属 IDC
Dcnclient 所在 DCN
Sysclient 所属子系统
Pidclient 进程号
Ipclient Ip
Usernameclient 用户名
Passwdclient 密码
Version协议版本
Language语言描述
Code请求码

RequestBody

Key说明
clientType客户端类型
heartbeatEntities心跳实体,包含 topic、url 等信息
  • 订阅消息:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端订阅的 topic
urltopic 对应的 url
  • 取消订阅消息:

RequestHeader

与心跳消息一致

RequestBody

与订阅消息一致

  • 发送异步事件:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端请求的 topic
content客户端发送的 topic 的内容
ttl客户端请求超时时间
bizSeqNo客户端请求业务流水号
uniqueId客户端请求消息唯一标识

2. Client 发起交互

场景Client 向 Server 发送消息请求码Server 回复 Client 消息的响应码说明
心跳HEARTBEAT(203)SUCCESS(0)/EVENTMESH_HEARTBEAT_ERROR(19)
订阅SUBSCRIBE(206)SUCCESS(0)/EVENTMESH_SUBSCRIBE_ERROR(17)
取消订阅UNSUBSCRIBE(207)SUCCESS(0)/EVENTMESH_UNSUBSCRIBE_ERROR(18)
发送异步事件MSG_SEND_ASYNC(104)SUCCESS(0)/EVENTMESH_SEND_ASYNC_MSG_ERR(14)

3. Server 发起交互

场景Server 向 Client 发送消息请求码Client 回复 Server 消息响应码说明
客户端接收异步事件HTTP_PUSH_CLIENT_ASYNC(105)retCoderetCode 值为 0 时代表成功

gRPC 协议文档

1. protobuf

eventmesh-protocol-gprc 模块有 EventMesh gRPC 客户端的 protobuf 文件。the protobuf 文件路径是 /src/main/proto/eventmesh-client.proto.

用 gradle build 生成 gRPC 代码在 /build/generated/source/proto/main. 生成代码用于 eventmesh-sdk-java 模块。

2. gRPC 数据模型

  • 消息

以下消息数据模型用于 publish(), requestReply()broadcast() APIs.

  1. message RequestHeader {
  2. string env = 1;
  3. string region = 2;
  4. string idc = 3;
  5. string ip = 4;
  6. string pid = 5;
  7. string sys = 6;
  8. string username = 7;
  9. string password = 8;
  10. string language = 9;
  11. string protocolType = 10;
  12. string protocolVersion = 11;
  13. string protocolDesc = 12;
  14. }
  15. message SimpleMessage {
  16. RequestHeader header = 1;
  17. string producerGroup = 2;
  18. string topic = 3;
  19. string content = 4;
  20. string ttl = 5;
  21. string uniqueId = 6;
  22. string seqNum = 7;
  23. string tag = 8;
  24. map<string, string> properties = 9;
  25. }
  26. message BatchMessage {
  27. RequestHeader header = 1;
  28. string producerGroup = 2;
  29. string topic = 3;
  30. message MessageItem {
  31. string content = 1;
  32. string ttl = 2;
  33. string uniqueId = 3;
  34. string seqNum = 4;
  35. string tag = 5;
  36. map<string, string> properties = 6;
  37. }
  38. repeated MessageItem messageItem = 4;
  39. }
  40. message Response {
  41. string respCode = 1;
  42. string respMsg = 2;
  43. string respTime = 3;
  44. }
  • 订阅

以下订阅数据模型用于 subscribe()unsubscribe() APIs.

  1. message Subscription {
  2. RequestHeader header = 1;
  3. string consumerGroup = 2;
  4. message SubscriptionItem {
  5. enum SubscriptionMode {
  6. CLUSTERING = 0;
  7. BROADCASTING = 1;
  8. }
  9. enum SubscriptionType {
  10. ASYNC = 0;
  11. SYNC = 1;
  12. }
  13. string topic = 1;
  14. SubscriptionMode mode = 2;
  15. SubscriptionType type = 3;
  16. }
  17. repeated SubscriptionItem subscriptionItems = 3;
  18. string url = 4;
  19. }
  • 心跳

以下心跳数据模型用于 heartbeat() API.

  1. message Heartbeat {
  2. enum ClientType {
  3. PUB = 0;
  4. SUB = 1;
  5. }
  6. RequestHeader header = 1;
  7. ClientType clientType = 2;
  8. string producerGroup = 3;
  9. string consumerGroup = 4;
  10. message HeartbeatItem {
  11. string topic = 1;
  12. string url = 2;
  13. }
  14. repeated HeartbeatItem heartbeatItems = 5;
  15. }

3. gRPC 服务接口

  • 事件生产端服务 APIs
  1. service PublisherService {
  2. # 异步事件生产
  3. rpc publish(SimpleMessage) returns (Response);
  4. # 同步事件生产
  5. rpc requestReply(SimpleMessage) returns (Response);
  6. # 批量事件生产
  7. rpc batchPublish(BatchMessage) returns (Response);
  8. }
  • 事件消费端服务 APIs
  1. service ConsumerService {
  2. # 所消费事件通过 HTTP Webhook 推送事件
  3. rpc subscribe(Subscription) returns (Response);
  4. # 所消费事件通过 TCP stream 推送事件
  5. rpc subscribeStream(Subscription) returns (stream SimpleMessage);
  6. rpc unsubscribe(Subscription) returns (Response);
  7. }
  • 客户端心跳服务 API
  1. service HeartbeatService {
  2. rpc heartbeat(Heartbeat) returns (Response);
  3. }