规则引擎概览
适用版本: EMQ X v3.1.0+
兼容提示: EMQ X v4.0 对规则引擎 SQL 语法做出较大调整,v3.x 升级用户请参照迁移指南进行适配。
EMQ X Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的”配置式”的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 EMQ X 的耦合度;也为 EMQ X 的私有功能定制提供了一个更优秀的基础架构。
说明
EMQ X 在消息发布、事件触发时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。
消息发布
规则引擎借助响应动作可将特定主题的消息处理结果存储到数据库,发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
选择发布到 t/# 主题的消息,并筛选出全部字段:
SELECT * FROM "t/#"
选择发布到 t/a 主题的消息,并从 JSON 格式的消息内容中筛选出 “x” 字段:
SELECT payload.x as x FROM "t/a"
事件触发
规则引擎使用 $events/ 开头的虚拟主题(事件主题)处理 EMQ X 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。
选择客户端连接事件,筛选 Username 为 ‘emqx’ 的设备并获取连接信息:
SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'emqx'
规则引擎数据和 SQL 语句格式,事件主题列表详细教程参见SQL手册。
最小规则
规则描述了数据从哪里来、如何筛选并处理数据、处理结果到哪里去三个配置,即一条可用的规则包含三个要素:
- 触发事件:规则通过事件触发,触发时事件给规则注入事件的上下文信息(数据源),通过 SQL 的 FROM 子句指定事件类型;
- 处理规则(SQL):使用 SELECT 子句 和 WHERE 子句以及内置处理函数, 从上下文信息中过滤和处理数据;
- 响应动作:如果有处理结果输出,规则将执行相应的动作,如持久化到数据库、重新发布处理后的消息、转发消息到消息队列等。一条规则可以配置多个响应动作。
如图所示是一条简单的规则,该条规则用于处理 消息发布 时的数据,将全部主题消息的 msg
字段,消息 topic
、QoS
筛选出来,发送到 Web Server 与 /uplink 主题:
规则引擎典型应用场景举例
- 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则引擎配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
- 数据筛选:车辆网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于 40 km/h 时的数据,此场景下可以使用规则引擎对消息进行条件过滤,向业务消息队列写入满足条件的数据;
- 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则引擎将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
- 消息编解码:其他公共协议/私有 TCP 协议接入、工控行业等应用场景下,可以通过规则引擎的本地处理函数(可在 EMQ X 上定制开发)做二进制/特殊格式消息体的编解码工作;亦可通过规则引擎的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。
迁移指南
4.0 版本中规则引擎 SQL 语法更加易用,3.x 版本中所有事件 FROM 子句后面均需要指定事件名称,4.0 以后我们引入事件主题 概念,默认情况下消息发布事件不再需要指定事件名称:
## 3.x 版本
## 需要指定事件名称进行处理
SELECT * FROM "message.publish" WHERE topic ~= 't/#'
## 4.0 及以后版本
## 默认处理 message.publish 事件, FROM 后面直接筛选 MQTT 主题
## 上述 SQL 语句等价于:
SELECT * FROM 't/#'
## 其他事件通过 事件主题 进行筛选
SELECT * FROM "$evnents/message_acked" where topic ~= 't/#'
SELECT * FROM "$evnents/client_connected"
Dashboard 中提供了旧版 SQL 语法转换功能可以完成 SQL 升级迁移。