HTTP 协议

EventMesh Java SDK 实现了 HTTP 异步消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh HTTP 客户端的配置信息。其中的 liteEventMeshAddruserNamepassword 字段需要和 EventMesh Runtime eventmesh.properties 文件中的相匹配。

  1. import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
  2. import org.apache.eventmesh.common.utils.IPUtils;
  3. import org.apache.eventmesh.common.utils.ThreadUtils;
  4. public class HTTP {
  5. public static void main(String[] args) throws Exception {
  6. EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
  7. .liteEventMeshAddr("localhost:10105")
  8. .producerGroup("TEST_PRODUCER_GROUP")
  9. .env("env")
  10. .idc("idc")
  11. .ip(IPUtils.getLocalAddress())
  12. .sys("1234")
  13. .pid(String.valueOf(ThreadUtils.getPID()))
  14. .userName("eventmesh")
  15. .password("password")
  16. .build();
  17. /* ... */
  18. }
  19. }

HTTP 消费者

EventMeshHttpConsumer 实现了 heartbeatsubscribeunsubscribe 方法。subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的话题和回调的 URL 地址。

  1. import org.apache.eventmesh.client.http.consumer.EventMeshHttpConsumer;
  2. import org.apache.eventmesh.common.protocol.SubscriptionItem;
  3. import org.apache.eventmesh.common.protocol.SubscriptionMode;
  4. import org.apache.eventmesh.common.protocol.SubscriptionType;
  5. import com.google.common.collect.Lists;
  6. public class HTTP {
  7. final String url = "http://localhost:8080/callback";
  8. final List<SubscriptionItem> topicList = Lists.newArrayList(
  9. new SubscriptionItem("eventmesh-async-topic", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)
  10. );
  11. public static void main(String[] args) throws Exception {
  12. /* ... */
  13. eventMeshHttpConsumer = new EventMeshHttpConsumer(eventMeshClientConfig);
  14. eventMeshHttpConsumer.heartBeat(topicList, url);
  15. eventMeshHttpConsumer.subscribe(topicList, url);
  16. /* ... */
  17. eventMeshHttpConsumer.unsubscribe(topicList, url);
  18. }
  19. }

EventMesh Runtime 将发送一个包含 CloudEvents 格式 信息的 POST 请求到这个回调的 URL 地址。类 SubController.java 实现了 Spring Boot controller,它将接收并解析回调信息。

HTTP 生产者

EventMeshHttpProducer 实现了 publish 方法。publish 方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
  • io.openmessaging.api.Message
  1. import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
  2. import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
  3. import org.apache.eventmesh.common.Constants;
  4. import org.apache.eventmesh.common.utils.JsonUtils;
  5. import io.cloudevents.CloudEvent;
  6. import io.cloudevents.core.builder.CloudEventBuilder;
  7. public class HTTP {
  8. public static void main(String[] args) throws Exception {
  9. /* ... */
  10. EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);
  11. Map<String, String> content = new HashMap<>();
  12. content.put("content", "testAsyncMessage");
  13. CloudEvent event = CloudEventBuilder.v1()
  14. .withId(UUID.randomUUID().toString())
  15. .withSubject("eventmesh-async-topic")
  16. .withSource(URI.create("/"))
  17. .withDataContentType("application/cloudevents+json")
  18. .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
  19. .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
  20. .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
  21. .build();
  22. eventMeshHttpProducer.publish(event);
  23. }
  24. }

使用 Curl 命令

本段落介绍通过 Curl 命令体验事件的收发功能。

事件发送

启动 EventMesh Runtime 服务后,可以使用 Curl 命令将事件用 HTTP POST 方法发布到指定的主题,Body 内容必须是 JSON 格式,执行命令示例如下:

  1. curl -H "Content-Type:application/json" -X POST -d '{"name": "admin", "pass":"12345678"}' http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC

事件订阅

启动 EventMesh Runtime 服务后,可以使用 Curl 命令用 HTTP POST 方法订阅指定的主题列表,Body 内容必须是 JSON 格式,执行命令示例如下:

  1. curl -H "Content-Type:application/json" -X POST -d '{"url": "http://127.0.0.1:8088/sub/test", "consumerGroup":"TEST-GROUP", "topic":[{"mode":"CLUSTERING","topic":"TEST-TOPIC-HTTP-ASYNC","type":"ASYNC"}]}' http://127.0.0.1:10105/eventmesh/subscribe/local

你可以在项目eventmesh-examples模块中看到这个例子。