Redis

通过 Redis 数据桥接可以通过执行自定义 Redis 数据操作命令的方式,将 MQTT 消息和客户端事件存储到 Redis 中,借助 Redis 高性能与灵活数据结构,实现诸如消息暂存,发布订阅和消息丢弃行为的计数与统计等业务。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用 Redis - 图1 (opens new window)

前置准备

功能清单

快速开始

在本示例中,我们将通过 Redis 实现:

  1. 暂存每个客户端最后一条消息;
  2. 每个主题消息丢弃计数。

安装 Redis

通过 Docker 安装并启动 Redis:

  1. # 启动一个 Redis 容器并设置密码为 public
  2. docker run --name redis -p 6379:6379 -d redis --requirepass "public"
  3. # 进入容器
  4. docker exec -it redis bash
  5. # 在容器中连接到 Redis 服务器,需要通过 AUTH 命令认证
  6. redis-cli
  7. 127.0.0.1:6379> AUTH public
  8. OK
  9. # 验证安装结果
  10. 127.0.0.1:6379> set emqx "Hello World"
  11. OK
  12. 127.0.0.1:6379> get emqx
  13. "Hello World"

至此,您已经完成 Redis 的安装并使用 SET GET 命令验证了安装结果,更多 Redis 命令请参考 Redis Commands Redis - 图2 (opens new window)

创建 Redis 数据桥接

需要创建两个 Redis 数据桥接分别完成预设场景,它们连接配置方式是相同的:

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。
  2. 点击页面右上角的创建
  3. 数据桥接类型中选择 Redis,点击下一步
  4. 输入数据桥接名称,要求是大小写英文字母和数字的组合。
  5. 部署模式根据情况选择,此处选择 single
  6. 输入 Redis 连接信息,服务器地址填写 127.0.0.1:6379密码填写 public数据库 ID 填写 0

至此,我们已经完成了到 Redis 的连接设置,Redis Command 模版会根据我们希望实现的功能场景略有不同,具体请看下文。

消息暂存

本节我们将演示如何通过 Redis 暂存每个客户端的最后一条消息。

  1. 完成上述 连接到 Redis 的配置。
  2. 配置 Redis Command 模板:使用 Redis HSET Redis - 图3 (opens new window) 命令与 hash 数据结构存储消息,数据格式以 clientid 为 key,存储 usernamepayloadtimestamp 等字段。为了便于与 Redis 中其他 key 区分,我们使用 emqx_messages 前缀作为 key 的命名空间并用 : 分割
  1. # HSET key filed value [field value...]
  2. HSET emqx_messages:${clientid} username ${username} payload ${payload} timestamp ${timestamp}
  1. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考配置参数
  2. 点击创建按钮完成数据桥接创建。

至此我们已经完成了数据桥接创建,接下来将继续创建一条规则来指定需要写入的数据。

创建数据转发规则

  1. 转到 Dashboard 数据集成 -> 规则页面。此外在完成数据桥接的创建后,EMQX 会弹窗询问是否创建相应规则,您也可点击弹窗中的创建规则按钮前往规则页面。
  2. 点击页面右上角的创建
  3. 输入规则 ID cache_to_redis,在 SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息暂存至 Redis,请确保规则选择出来的字段(SELECT 部分)包含所有 Redis Command 模板中用到的变量,此处规则 SQL 如下:
  1. SELECT
  2. *
  3. FROM
  4. "t/#"
  1. 添加动作,在动作下拉框中选择 使用数据桥接转发,选择之前创建好的 Redis 数据桥接,并点击添加
  2. 点击页面最下方的创建按钮完成规则创建。

至此您已经完成消息暂存的创建过程。

消息丢弃统计

本节我们将演示如何通过 Redis 统计 EMQX 消息丢弃情况。

注意:除 Redis Command 模板与规则外,其他操作步骤与消息暂存章节完全相同。

数据桥接的 Redis Command 模板

使用 HINCRBY Redis - 图4 (opens new window) 命令,对每个主题消息丢弃数量计数,模板如下:

  1. # HINCRBY key field increment
  2. HINCRBY emqx_message_dropped_count ${topic} 1

该命令每次执行时,对应计数器将 +1。

规则 SQL

EMQX 规则提供了 2 个消息丢弃事件,可以通过其触发规则并记录到 Redis 中:

事件名称事件主题事件参数
消息在转发的过程中被丢弃$events/message_dropped点击查看
消息在投递的过程中被丢弃$events/delivery_dropped点击查看

对应规则 SQL 如下:

  1. SELECT
  2. *
  3. FROM
  4. "$events/message_dropped", "$events/delivery_dropped"

测试数据桥接与规则

使用 MQTTX 向 t/1 主题发布消息,此操作会触发消息暂存规则,如果 t/1 主题没有订阅者,消息将被丢弃并触发消息丢弃规则:

  1. mqttx pub -i emqx_c -u emqx_u -t t/1 -m '{ "msg": "hello Redis" }'

分别查看两个数据桥接运行统计,命中、发送成功次数均 +1。

查看消息是否已经暂存:

  1. 127.0.0.1:6379> HGETALL emqx_messages:emqx_c
  2. 1) "username"
  3. 2) "emqx_u"
  4. 3) "payload"
  5. 4) "{ \"msg\": \"hello Redis\" }"
  6. 5) "timestamp"
  7. 6) "1675263885119"

再次执行消息发布测试并查看数据,timestamp 字段的值将更新。

查看消息丢失是否已经被计数:

  1. 127.0.0.1:6379> HGETALL emqx_message_dropped_count
  2. 1) "t/1"
  3. 2) "1"

多次执行消息发布测试并查看数据,t/1 字段的计数器将增长。