事件驱动

在jetlinks中大量使用到事件驱动,在之前,我们是使用spring event作为事件总线进行进程内事件通知的. 由于spring event不支持响应式,所以使用消息网关(MessageGateway)来替代spring event. 消息网关有2个作用,1. 事件驱动 2. 设备消息统一管理.

概念

在消息网关中分为: 消息网关(MessageGateway),消息连接器(MessageConnector),消息连接(MessageConnection), 消息订阅器(MessageSubscriber),消息发布器(MessagePublihser). 网关中对消息使用topic进行区分,而不是像spring event那样使用java类型来区分.

Topic

采用树结构来定义topic如:/device/id/message/type . topic支持路径通配符,如:/device/** 或者/device/*/message/*.

TIP

**表示匹配多层路径,*表示匹配单层路径. 不支持前后匹配,如: /device/id-*/message

消息网关

消息网关连接器中订阅消息连接,当有连接创建时,会根据连接类型进行不同的操作. 当消息连接是一个订阅器时(isSubscriber)时,会从MessageSubscriber中接收消息订阅请求(onSubscribe), 并管理每一个连接的订阅信息.当一个消息连接是一个发布器时(isPublisher),会从MessagePublihser订阅消息(onMessage), 当发布器发送了消息(TopicMessage)时,网关会根据消息的topic获取订阅了此topic的消息连接,并将消息推送给对应的订阅器.

使用

订阅消息:

  1. @Subscribe("/device/**")
  2. public Mono<Void> handleDeviceMessage(DeviceMessage message){
  3. return publishDeviecMessageToKafka(message);
  4. }

发布消息:

  1. @Autowired
  2. private MessageGateway gateway;
  3. public Mono<Void> saveUser(UserEntity entity){
  4. return service.saveUser(entity)
  5. .then(gateway.publish("/user/"+entity.getId()+"/saved",entity))
  6. .then();
  7. }

自定义连接器

消息网关还可以用于消息转发,如实现设备消息统一网关.如: 通过CoAP发送消息,使用MQTT订阅消息.

  1. 实现MessageConnector接口.
  2. 将连接器中的连接实现MessageConnection接口.
  3. 根据情况,如果连接需要订阅消息,则还要实现MessageSubscriber,如果需要发布消息则实现MessagePublisher.

例子:

  1. public class MqttMessageConnector implements MessageConnector {
  2. private MqttServer mqttServer;
  3. private ClientAuthenticator authenticator;
  4. private int maxClientSize;
  5. @Nonnull
  6. @Override
  7. public String getId() {
  8. return mqttServer.getId();
  9. }
  10. @Nonnull
  11. @Override
  12. public Flux<MessageConnection> onConnection() {
  13. //从MQTT服务中订阅mqtt连接
  14. return mqttServer
  15. .handleConnection()
  16. .filter(conn -> {
  17. if (conn.getAuth().isPresent()) {
  18. return true;
  19. }
  20. conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
  21. return false;
  22. })
  23. .flatMap(conn -> {
  24. MqttAuth auth = conn.getAuth().orElse(null);
  25. if (auth == null) {
  26. return Mono.empty();
  27. }
  28. //认证
  29. return authenticator
  30. .authorize(new MqttAuthenticationRequest(conn.getClientId(), auth.getUsername(), auth.getUsername(), DefaultTransport.MQTT))
  31. .map(resp -> new MqttMessageConnection(conn.accept(), resp));
  32. });
  33. }
  34. @AllArgsConstructor
  35. class MqttMessageConnection implements
  36. MessageConnection,
  37. MessagePublisher,
  38. MessageSubscriber {
  39. private MqttConnection mqttConnection;
  40. private ClientAuthentication authentication;
  41. @Override
  42. public String getId() {
  43. return mqttConnection.getClientId();
  44. }
  45. @Override
  46. public void onDisconnect(Runnable disconnectListener) {
  47. mqttConnection.onClose(conn -> disconnectListener.run());
  48. }
  49. @Override
  50. public void disconnect() {
  51. mqttConnection.close().subscribe();
  52. }
  53. @Override
  54. public boolean isAlive() {
  55. return mqttConnection.isAlive();
  56. }
  57. @Nonnull
  58. @Override
  59. public Flux<TopicMessage> onMessage() {
  60. //从MQTT连接中订阅消息
  61. return mqttConnection
  62. .handleMessage()
  63. .flatMap(publishing -> {
  64. MqttMessage mqttMessage = publishing.getMessage();
  65. String topic = mqttMessage.getTopic();
  66. return authentication
  67. .getAuthority(topic)
  68. .filter(auth -> auth.has(TopicAuthority.PUB))
  69. .map(auth -> TopicMessage.of(mqttMessage.getTopic(), mqttMessage))
  70. .switchIfEmpty(Mono.fromRunnable(() -> log.warn("客户端[{}]推送无权限topic[{}]消息", getId(), topic)))
  71. .doOnEach(ReactiveLogger.onError(err -> {
  72. log.error("处理MQTT消息失败", err);
  73. }))
  74. ;
  75. }).onErrorContinue((err, obj) -> {
  76. });
  77. }
  78. @Nonnull
  79. @Override
  80. public Mono<Void> publish(@Nonnull TopicMessage message) {
  81. //将消息推送给MQTT
  82. return mqttConnection.publish(SimpleMqttMessage.builder()
  83. .payload(message.getMessage().getPayload())
  84. .topic(message.getTopic())
  85. .qosLevel(0)
  86. .build());
  87. }
  88. @Nonnull
  89. @Override
  90. public Flux<Subscription> onSubscribe() {
  91. //MQTT客户端订阅topic
  92. return mqttConnection
  93. .handleSubscribe(true)
  94. .flatMapIterable(sub -> sub.getMessage().topicSubscriptions())
  95. .map(MqttTopicSubscription::topicName)
  96. .filterWhen(topic -> authentication.getAuthority(topic).map(auth -> auth.has(TopicAuthority.SUB)))
  97. .map(Subscription::new);
  98. }
  99. @Nonnull
  100. @Override
  101. public Flux<Subscription> onUnSubscribe() {
  102. //MQTT客户端取消订阅
  103. return mqttConnection
  104. .handleUnSubscribe(true)
  105. .flatMapIterable(msg -> msg.getMessage().topics())
  106. .map(Subscription::new);
  107. }
  108. @Override
  109. public boolean isShareCluster() {
  110. //Pro将支持共享集群的消息,如: 节点1的网关收到了消息,MQTT从服务节点2订阅了请求.
  111. return false;
  112. }
  113. }
  114. }