TCP 协议

EventMesh Java SDK 实现了同步、异步和广播 TCP 消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh TCP 客户端的配置信息。其中的 hostport 字段需要和 EventMesh Runtime eventmesh.properties 文件中的相匹配。

  1. import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
  2. import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
  3. import io.cloudevents.CloudEvent;
  4. public class AsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
  5. public static void main(String[] args) throws InterruptedException {
  6. EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
  7. .host(eventMeshIp)
  8. .port(eventMeshTcpPort)
  9. .userAgent(userAgent)
  10. .build();
  11. /* ... */
  12. }
  13. }

TCP 消费者

消费者应该实现 ReceiveMsgHook 类,其被定义在 ReceiveMsgHook.java

  1. public interface ReceiveMsgHook<ProtocolMessage> {
  2. Optional<ProtocolMessage> handle(ProtocolMessage msg);
  3. }

EventMeshTCPClient 实现了 subscribe 方法。该方法接收话题、SubscriptionModeSubscriptionTypehandle 方法将会在消费者从订阅的话题中收到消息时被调用。如果 SubscriptionTypeSYNChandle 的返回值将被发送回生产者。

  1. import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
  2. import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
  3. import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
  4. import org.apache.eventmesh.common.protocol.SubscriptionMode;
  5. import org.apache.eventmesh.common.protocol.SubscriptionType;
  6. import io.cloudevents.CloudEvent;
  7. public class TCPConsumer implements ReceiveMsgHook<CloudEvent> {
  8. public static TCPConsumer handler = new TCPConsumer();
  9. private static EventMeshTCPClient<CloudEvent> client;
  10. public static void main(String[] args) throws Exception {
  11. client = EventMeshTCPClientFactory.createEventMeshTCPClient(
  12. eventMeshTcpClientConfig,
  13. CloudEvent.class
  14. );
  15. client.init();
  16. client.subscribe(
  17. "eventmesh-sync-topic",
  18. SubscriptionMode.CLUSTERING,
  19. SubscriptionType.SYNC
  20. );
  21. client.registerSubBusiHandler(handler);
  22. client.listen();
  23. }
  24. @Override
  25. public Optional<CloudEvent> handle(CloudEvent message) {
  26. log.info("Messaged received: {}", message);
  27. return Optional.of(message);
  28. }
  29. }

TCP 生产者

异步生产者

EventMeshTCPClient 实现了 public 方法。该方法接收将被发布的消息和一个可选的 timeout 值,并返回来自消费者的响应消息。

  1. /* ... */
  2. client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
  3. client.init();
  4. CloudEvent event = CloudEventBuilder.v1()
  5. .withId(UUID.randomUUID().toString())
  6. .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
  7. .withSource(URI.create("/"))
  8. .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
  9. .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  10. .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  11. .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  12. .build();
  13. client.publish(event, 1000);

同步生产者

EventMeshTCPClient 实现了 rr 方法。该方法接收将被发布的消息和一个可选的 timeout 值,并返回来自消费者的响应消息。

  1. /* ... */
  2. client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
  3. client.init();
  4. CloudEvent event = CloudEventBuilder.v1()
  5. .withId(UUID.randomUUID().toString())
  6. .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
  7. .withSource(URI.create("/"))
  8. .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
  9. .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  10. .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  11. .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  12. .build();
  13. Package response = client.rr(event, 1000);
  14. CloudEvent replyEvent = EventFormatProvider
  15. .getInstance()
  16. .resolveFormat(JsonFormat.CONTENT_TYPE)
  17. .deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));