gRPC 协议
EventMesh Java SDK 实现了 gRPC 同步、异步和广播消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig
类实例来指定 EventMesh gRPC 客户端的配置信息。其中的 liteEventMeshAddr
、userName
和 password
字段需要和 EventMesh runtime eventmesh.properties
文件中的相匹配。
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import io.cloudevents.CloudEvent;
public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
public static void main(String[] args) throws InterruptedException {
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr("localhost")
.serverPort(10205)
.consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
/* ... */
}
}
gRPC 消费者
流消费者
EventMesh runtime 会将来自生产者的信息作为一系列事件流向流消费者发送。消费者应实现 ReceiveHook
类,其被定义在 ReceiveMsgHook.java。
public interface ReceiveMsgHook<T> {
Optional<T> handle(T msg) throws Throwable;
String getProtocolType();
}
类 EventMeshGrpcConsumer
实现了 registerListener
、subscribe
和 unsubscribe
方法。subscribe
方法接收一个 SubscriptionItem
对象的列表,其中定义了要订阅的话题。registerListener
接收一个实现了 ReceiveMsgHook
的实例。handle
方法将会在消费者收到订阅的主题消息时被调用。如果 SubscriptionType
是 SYNC
,handle
的返回值将被发送回生产者。
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import io.cloudevents.CloudEvent;
public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe();
public static void main(String[] args) throws InterruptedException {
/* ... */
SubscriptionItem subscriptionItem = new SubscriptionItem(
"eventmesh-async-topic",
SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC
);
EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
eventMeshGrpcConsumer.registerListener(handler);
eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem));
/* ... */
eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem));
}
@Override
public Optional<CloudEvent> handle(CloudEvent message) {
log.info("Messaged received: {}", message);
return Optional.empty();
}
@Override
public String getProtocolType() {
return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME;
}
}
Webhook 消费者
类 EventMeshGrpcConsumer
的 subscribe
方法接收一个 SubscriptionItem
对象的列表,其中定义了要订阅的主题和一个可选的 timeout 值。如果提供了回调 URL,EventMesh runtime 将向回调 URL 地址发送一个包含 CloudEvents 格式 消息的 POST 请求。SubController.java 实现了一个接收并解析回调信息的 Spring Boot controller。
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
@Component
public class SubService implements InitializingBean {
final String url = "http://localhost:8080/callback";
public void afterPropertiesSet() throws Exception {
/* ... */
eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();
SubscriptionItem subscriptionItem = new SubscriptionItem(
"eventmesh-async-topic",
SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC
);
eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem), url);
/* ... */
eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem), url);
}
}
gRPC 生产者
异步生产者
类 EventMeshGrpcProducer
实现了 publish
方法。publish
方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessage
io.cloudevents.CloudEvent
/* ... */
EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);
eventMeshGrpcProducer.init();
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshGrpcProducer.publish(event);
同步生产者
类 EventMeshGrpcProducer
实现了 requestReply
方法。requestReply
方法接收将被发布的消息和一个可选的 timeout 值。方法会返回消费者返回的消息。消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessage
io.cloudevents.CloudEvent
批量生产者
类 EventMeshGrpcProducer
重写了 publish
方法,该方法接收一个将被发布的消息列表和一个可选的 timeout 值。列表中的消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessage
io.cloudevents.CloudEvent
/* ... */
List<CloudEvent> cloudEventList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
cloudEventList.add(event);
}
eventMeshGrpcProducer.publish(cloudEventList);
/* ... */