Redis
通过 Redis 数据桥接可以通过执行自定义 Redis 数据操作命令的方式,将 MQTT 消息和客户端事件存储到 Redis 中,借助 Redis 高性能与灵活数据结构,实现诸如消息暂存,发布订阅和消息丢弃行为的计数与统计等业务。
提示
EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用 (opens new window)。
前置准备
功能清单
快速开始
在本示例中,我们将通过 Redis 实现:
- 暂存每个客户端最后一条消息;
- 每个主题消息丢弃计数。
安装 Redis
通过 Docker 安装并启动 Redis:
# 启动一个 Redis 容器并设置密码为 public
docker run --name redis -p 6379:6379 -d redis --requirepass "public"
# 进入容器
docker exec -it redis bash
# 在容器中连接到 Redis 服务器,需要通过 AUTH 命令认证
redis-cli
127.0.0.1:6379> AUTH public
OK
# 验证安装结果
127.0.0.1:6379> set emqx "Hello World"
OK
127.0.0.1:6379> get emqx
"Hello World"
至此,您已经完成 Redis 的安装并使用 SET
GET
命令验证了安装结果,更多 Redis 命令请参考 Redis Commands (opens new window)。
创建 Redis 数据桥接
需要创建两个 Redis 数据桥接分别完成预设场景,它们连接配置方式是相同的:
- 转到 Dashboard 数据集成 -> 数据桥接页面。
- 点击页面右上角的创建。
- 在数据桥接类型中选择 Redis,点击下一步。
- 输入数据桥接名称,要求是大小写英文字母和数字的组合。
- 部署模式根据情况选择,此处选择 single。
- 输入 Redis 连接信息,服务器地址填写 127.0.0.1:6379,密码填写 public,数据库 ID 填写 0。
至此,我们已经完成了到 Redis 的连接设置,Redis Command 模版会根据我们希望实现的功能场景略有不同,具体请看下文。
消息暂存
本节我们将演示如何通过 Redis 暂存每个客户端的最后一条消息。
- 完成上述 连接到 Redis 的配置。
- 配置 Redis Command 模板:使用 Redis HSET (opens new window) 命令与 hash 数据结构存储消息,数据格式以
clientid
为 key,存储username
、payload
和timestamp
等字段。为了便于与 Redis 中其他 key 区分,我们使用emqx_messages
前缀作为 key 的命名空间并用:
分割
# HSET key filed value [field value...]
HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
- 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考配置参数。
- 点击创建按钮完成数据桥接创建。
至此我们已经完成了数据桥接创建,接下来将继续创建一条规则来指定需要写入的数据。
创建数据转发规则
- 转到 Dashboard 数据集成 -> 规则页面。此外在完成数据桥接的创建后,EMQX 会弹窗询问是否创建相应规则,您也可点击弹窗中的创建规则按钮前往规则页面。
- 点击页面右上角的创建。
- 输入规则 ID
cache_to_redis
,在 SQL 编辑器中输入规则,此处选择将t/#
主题的 MQTT 消息暂存至 Redis,请确保规则选择出来的字段(SELECT 部分)包含所有 Redis Command 模板中用到的变量,此处规则 SQL 如下:
SELECT
*
FROM
"t/#"
- 添加动作,在动作下拉框中选择 使用数据桥接转发,选择之前创建好的 Redis 数据桥接,并点击添加。
- 点击页面最下方的创建按钮完成规则创建。
至此您已经完成消息暂存的创建过程。
消息丢弃统计
本节我们将演示如何通过 Redis 统计 EMQX 消息丢弃情况。
注意:除 Redis Command 模板与规则外,其他操作步骤与消息暂存章节完全相同。
数据桥接的 Redis Command 模板
使用 HINCRBY (opens new window) 命令,对每个主题消息丢弃数量计数,模板如下:
# HINCRBY key field increment
HINCRBY emqx_message_dropped_count ${topic} 1
该命令每次执行时,对应计数器将 +1。
规则 SQL
EMQX 规则提供了 2 个消息丢弃事件,可以通过其触发规则并记录到 Redis 中:
事件名称 | 事件主题 | 事件参数 |
---|---|---|
消息在转发的过程中被丢弃 | $events/message_dropped | 点击查看 |
消息在投递的过程中被丢弃 | $events/delivery_dropped | 点击查看 |
对应规则 SQL 如下:
SELECT
*
FROM
"$events/message_dropped", "$events/delivery_dropped"
测试数据桥接与规则
使用 MQTTX 向 t/1
主题发布消息,此操作会触发消息暂存规则,如果 t/1
主题没有订阅者,消息将被丢弃并触发消息丢弃规则:
mqttx pub -i emqx_c -u emqx_u -t t/1 -m '{ "msg": "hello Redis" }'
分别查看两个数据桥接运行统计,命中、发送成功次数均 +1。
查看消息是否已经暂存:
127.0.0.1:6379> HGETALL emqx_messages:emqx_c
1) "username"
2) "emqx_u"
3) "payload"
4) "{ \"msg\": \"hello Redis\" }"
5) "timestamp"
6) "1675263885119"
再次执行消息发布测试并查看数据,timestamp
字段的值将更新。
查看消息丢失是否已经被计数:
127.0.0.1:6379> HGETALL emqx_message_dropped_count
1) "t/1"
2) "1"
多次执行消息发布测试并查看数据,t/1
字段的计数器将增长。