gRPC 协议

EventMesh Java SDK 实现了 gRPC 同步、异步和广播消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh gRPC 客户端的配置信息。其中的 liteEventMeshAddruserNamepassword 字段需要和 EventMesh runtime eventmesh.properties 文件中的相匹配。

  1. import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
  2. import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
  3. import io.cloudevents.CloudEvent;
  4. public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
  5. public static void main(String[] args) throws InterruptedException {
  6. EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
  7. .serverAddr("localhost")
  8. .serverPort(10205)
  9. .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
  10. .env("env").idc("idc")
  11. .sys("1234").build();
  12. /* ... */
  13. }
  14. }

gRPC 消费者

流消费者

EventMesh runtime 会将来自生产者的信息作为一系列事件流向流消费者发送。消费者应实现 ReceiveHook 类,其被定义在 ReceiveMsgHook.java

  1. public interface ReceiveMsgHook<T> {
  2. Optional<T> handle(T msg) throws Throwable;
  3. String getProtocolType();
  4. }

EventMeshGrpcConsumer 实现了 registerListenersubscribeunsubscribe 方法。subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的话题。registerListener 接收一个实现了 ReceiveMsgHook 的实例。handle 方法将会在消费者收到订阅的主题消息时被调用。如果 SubscriptionTypeSYNChandle 的返回值将被发送回生产者。

  1. import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
  2. import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
  3. import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
  4. import org.apache.eventmesh.common.protocol.SubscriptionItem;
  5. import org.apache.eventmesh.common.protocol.SubscriptionMode;
  6. import org.apache.eventmesh.common.protocol.SubscriptionType;
  7. import io.cloudevents.CloudEvent;
  8. public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
  9. public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe();
  10. public static void main(String[] args) throws InterruptedException {
  11. /* ... */
  12. SubscriptionItem subscriptionItem = new SubscriptionItem(
  13. "eventmesh-async-topic",
  14. SubscriptionMode.CLUSTERING,
  15. SubscriptionType.ASYNC
  16. );
  17. EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
  18. eventMeshGrpcConsumer.init();
  19. eventMeshGrpcConsumer.registerListener(handler);
  20. eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem));
  21. /* ... */
  22. eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem));
  23. }
  24. @Override
  25. public Optional<CloudEvent> handle(CloudEvent message) {
  26. log.info("Messaged received: {}", message);
  27. return Optional.empty();
  28. }
  29. @Override
  30. public String getProtocolType() {
  31. return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME;
  32. }
  33. }

Webhook 消费者

EventMeshGrpcConsumersubscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的主题和一个可选的 timeout 值。如果提供了回调 URL,EventMesh runtime 将向回调 URL 地址发送一个包含 CloudEvents 格式 消息的 POST 请求。SubController.java 实现了一个接收并解析回调信息的 Spring Boot controller。

  1. import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
  2. import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
  3. import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
  4. import org.apache.eventmesh.common.protocol.SubscriptionItem;
  5. import org.apache.eventmesh.common.protocol.SubscriptionMode;
  6. import org.apache.eventmesh.common.protocol.SubscriptionType;
  7. @Component
  8. public class SubService implements InitializingBean {
  9. final String url = "http://localhost:8080/callback";
  10. public void afterPropertiesSet() throws Exception {
  11. /* ... */
  12. eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
  13. eventMeshGrpcConsumer.init();
  14. SubscriptionItem subscriptionItem = new SubscriptionItem(
  15. "eventmesh-async-topic",
  16. SubscriptionMode.CLUSTERING,
  17. SubscriptionType.ASYNC
  18. );
  19. eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem), url);
  20. /* ... */
  21. eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem), url);
  22. }
  23. }

gRPC 生产者

异步生产者

EventMeshGrpcProducer 实现了 publish 方法。publish 方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
  1. /* ... */
  2. EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);
  3. eventMeshGrpcProducer.init();
  4. Map<String, String> content = new HashMap<>();
  5. content.put("content", "testAsyncMessage");
  6. CloudEvent event = CloudEventBuilder.v1()
  7. .withId(UUID.randomUUID().toString())
  8. .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
  9. .withSource(URI.create("/"))
  10. .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
  11. .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  12. .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  13. .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  14. .build();
  15. eventMeshGrpcProducer.publish(event);

同步生产者

EventMeshGrpcProducer 实现了 requestReply 方法。requestReply 方法接收将被发布的消息和一个可选的 timeout 值。方法会返回消费者返回的消息。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent

批量生产者

EventMeshGrpcProducer 重写了 publish 方法,该方法接收一个将被发布的消息列表和一个可选的 timeout 值。列表中的消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
  1. /* ... */
  2. List<CloudEvent> cloudEventList = new ArrayList<>();
  3. for (int i = 0; i < 5; i++) {
  4. CloudEvent event = CloudEventBuilder.v1()
  5. .withId(UUID.randomUUID().toString())
  6. .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
  7. .withSource(URI.create("/"))
  8. .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
  9. .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  10. .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  11. .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  12. .build();
  13. cloudEventList.add(event);
  14. }
  15. eventMeshGrpcProducer.publish(cloudEventList);
  16. /* ... */