Redis 数据存储

配置文件: emqx_backend_redis.conf

配置 Redis 服务器

支持配置多台 Redis 服务器连接池:

  1. ## Redis 服务集群类型: single | sentinel | cluster
  2. backend.redis.pool1.type = single
  3. ## Redis 服务器地址列表
  4. backend.redis.pool1.server = 127.0.0.1:6379
  5. ## Redis sentinel 模式下的 sentinel 名称
  6. ## backend.redis.pool1.sentinel = mymaster
  7. ## Redis 连接池大小
  8. backend.redis.pool1.pool_size = 8
  9. ## Redis 数据库名称
  10. backend.redis.pool1.database = 0
  11. ## Redis 密码
  12. ## backend.redis.pool1.password =
  13. ## 订阅的 Redis channel 名称
  14. backend.redis.pool1.channel = mqtt_channel

配置 Redis 存储规则

  1. backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  2. backend.redis.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  3. backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  4. backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
  5. backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
  6. backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  7. backend.redis.hook.session.unsubscribed.1= {"topic": "#", "action": {"commands": ["DEL mqtt:acked:${clientid}:${topic}"]}, "pool": "pool1"}
  8. backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "expired_time" : 3600, "pool": "pool1"}
  9. backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "expired_time" : 3600, "pool": "pool1"}
  10. backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  11. backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
  12. backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
  13. ## backend.redis.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_keep_latest"}, "pool": "pool1"}
  14. ## backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_store_keep_latest"}, "expired_time" : 3600, "pool": "pool1"}
  15. ## backend.redis.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_keep_latest"}, "pool": "pool1"}

Redis 存储规则说明

hooktopicaction/function说明
client.connectedon_client_connected存储客户端在线状态
session.createdon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribedqueue/#on_message_fetch_for_queue获取一对一离线消息
session.subscribedpubsub/#on_message_fetch_for_pubsub获取一对多离线消息
session.subscribed#on_retain_lookup获取 retain 消息
session.unsubscribed#删除 acked 消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储 retain 消息
message.publish#on_retain_delete删除 retain 消息
message.ackedqueue/#on_message_acked_for_queue一对一消息 ACK 处理
message.ackedpubsub/#on_message_acked_for_pubsub一对多消息 ACK 处理

Redis 命令行参数说明

hook可用参数示例(每个字段分隔,必须是一个空格)
client.connectedclientidSET conn:${clientid} ${clientid}
client.disconnectedclientidSET disconn:${clientid} ${clientid}
session.subscribedclientid, topic, qosHSET sub:${clientid} ${topic} ${qos}
session.unsubscribedclientid, topicSET unsub:${clientid} ${topic}
message.publishmessage, msgid, topic, payload, qos, clientidRPUSH pub:${topic} ${msgid}
message.ackedmsgid, topic, clientidHSET ack:${clientid} ${topic} ${msgid}
message.delivermsgid, topic, clientidHSET deliver:${clientid} ${topic} ${msgid}

Redis 命令行配置 Action

Redis 存储支持用户采用 Redis Commands 语句配置 Action,例如:

  1. ## 在客户端连接到 EMQ X 服务器后,执行一条 redis
  2. backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} ${clientid}"]}, "pool": "pool1"}

Redis 设备在线状态 Hash

mqtt:client Hash 存储设备在线状态:

  1. hmset
  2. key = mqtt:client:${clientid}
  3. value = {state:int, online_at:timestamp, offline_at:timestamp}
  4. hset
  5. key = mqtt:node:${node}
  6. field = ${clientid}
  7. value = ${ts}

查询设备在线状态:

  1. HGETALL "mqtt:client:${clientId}"

例如 ClientId 为 test 客户端上线:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "1"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "undefined"

例如 ClientId 为 test 客户端下线:

  1. HGETALL mqtt:client:test
  2. 1) "state"
  3. 2) "0"
  4. 3) "online_at"
  5. 4) "1481685802"
  6. 5) "offline_at"
  7. 6) "1481685924"

Redis 保留消息 Hash

mqtt:retain Hash 存储 Retain 消息:

  1. hmset
  2. key = mqtt:retain:${topic}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

查询 retain 消息:

  1. HGETALL "mqtt:retain:${topic}"

例如查看 topic 为 topic 的 retain 消息:

  1. HGETALL mqtt:retain:topic
  2. 1) "id"
  3. 2) "6P9NLcJ65VXBbC22sYb4"
  4. 3) "from"
  5. 4) "test"
  6. 5) "qos"
  7. 6) "1"
  8. 7) "topic"
  9. 8) "topic"
  10. 9) "retain"
  11. 10) "true"
  12. 11) "payload"
  13. 12) "Hello world!"
  14. 13) "ts"
  15. 14) "1481690659"

Redis 消息存储 Hash

mqtt:msg Hash 存储 MQTT 消息:

  1. hmset
  2. key = mqtt:msg:${msgid}
  3. value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
  4. zadd
  5. key = mqtt:msg:${topic}
  6. field = 1
  7. value = ${msgid}

Redis 消息确认 SET

mqtt:acked SET 存储客户端消息确认:

  1. set
  2. key = mqtt:acked:${clientid}:${topic}
  3. value = ${msgid}

Redis 订阅存储 Hash

mqtt:sub Hash 存储订阅关系:

  1. hset
  2. key = mqtt:sub:${clientid}
  3. field = ${topic}
  4. value = ${qos}

某个客户端订阅主题:

  1. HSET mqtt:sub:${clientid} ${topic} ${qos}

例如为 ClientId 为 test 的客户端订阅主题 topic1, topic2 :

  1. HSET "mqtt:sub:test" "topic1" 1
  2. HSET "mqtt:sub:test" "topic2" 2

查询 ClientId 为 test 的客户端已订阅主题:

  1. HGETALL mqtt:sub:test
  2. 1) "topic1"
  3. 2) "1"
  4. 3) "topic2"
  5. 4) "2"

Redis SUB/UNSUB 事件发布

设备需要订阅/取消订阅主题时,业务服务器向 Redis 发布事件消息:

  1. PUBLISH
  2. channel = "mqtt_channel"
  3. message = {type: string , topic: string, clientid: string, qos: int}
  4. \*type: [subscribe/unsubscribe]

例如 ClientId 为 test 客户端订阅主题 topic0 :

  1. PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"

例如 ClientId 为 test 客户端取消订阅主题:

  1. PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"

Tip

Redis Cluster 无法使用 Redis PUB/SUB 功能。

启用 Redis 数据存储插件

  1. ./bin/emqx_ctl plugins load emqx_backend_redis