- Redis Backend
- Configure the Redis Server
- Configure Persistence Hooks
- Description of Persistence Hooks
- Redis Command Line Parameters
- Configure ‘action’ with Redis Commands
- Using Redis Hash for Devices’ Connection State
- Using Redis Hash for Retained Messages
- Using Redis Hash for messages
- Using Redis Set for Message Acknowledgements
- Using Redis Hash for Subscription
- Redis SUB/UNSUB Publish
- Enable Redis Backend
Redis Backend
TIP
After EMQX version 3.1, a powerful rule engine is introduced to replace plug-ins. It is recommended that you use it. See Save data to Redis to setup Save data to Redis in rule engine.
Config file: emqx_backend_redis.conf
Configure the Redis Server
Config Connection Pool of Multiple Redis Servers:
## Redis Server
backend.redis.pool1.server = 127.0.0.1:6379
## Redis Sentinel
## backend.redis.server = 127.0.0.1:26378
##Redis Sentinel Cluster name
## backend.redis.sentinel = mymaster
## Redis Pool Size
backend.redis.pool1.pool_size = 8
## Redis database
backend.redis.pool1.database = 1
## Redis subscribe channel
backend.redis.pool1.channel = mqtt_channel
Configure Persistence Hooks
## Expired after seconds, if =< 0 take the default value
backend.redis.msg.expired_after = 3600
## Client Connected Record
backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message for one QOS > 0
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
## Lookup Unread Message for many QOS > 0
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
## Lookup Retain Message
backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack for one
backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
## Store Ack for many
backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
Description of Persistence Hooks
hook | topic | action/function | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribe to topics | |
client.disconnected | on_client_disconnected | Store the client disconnected state | |
session.subscribed | queue/# | on_message_fetch_for_queue | Fetch one to one offline message |
session.subscribed | pubsub/# | on_message_fetch_for_pubsub | Fetch one to many offline message |
session.subscribed | # | on_retain_lookup | Lookup retained message |
message.publish | # | on_message_publish | Store the published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | queue/# | on_message_acked_for_queue | Process ACK of one to one messages |
message.acked | pubsub/# | on_message_acked_for_pubsub | Process ACK of one to many messages |
Redis Command Line Parameters
hook | Parameter | Example (Fields separated exactly by one space) |
---|---|---|
client.connected | clientid | SET conn:${clientid} clientid |
client.disconnected | clientid | SET disconn:${clientid} clientid |
session.subscribed | clientid, topic, qos | HSET sub:${clientid} topic qos |
session.unsubscribed | clientid, topic | SET unsub:${clientid} topic |
message.publish | message, msgid, topic, payload, qos, clientid | RPUSH pub:${topic} msgid |
message.acked | msgid, topic, clientid | HSET ack:${clientid} topic msgid |
message.delivered | msgid, topic, clientid | HSET delivered:${clientid} topic msgid |
Configure ‘action’ with Redis Commands
Redis backend supports raw ‘commands’ in ‘action’, e.g.:
## After a client connected to the EMQX server, it executes a redis command (multiple redis commands also supported)
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}
Using Redis Hash for Devices’ Connection State
mqtt:client Hash for devices’ connection state:
hmset
key = mqtt:client:${clientid}
value = {state:int, online_at:timestamp, offline_at:timestamp}
hset
key = mqtt:node:${node}
field = ${clientid}
value = ${ts}
Lookup devices’ connection state:
HGETALL "mqtt:client:${clientId}"
E.g.: Client with ClientId ‘test’ goes online:
HGETALL mqtt:client:test
1) "state"
2) "1"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "undefined"
Client with ClientId ‘test’ goes offline:
HGETALL mqtt:client:test
1) "state"
2) "0"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "1481685924"
Using Redis Hash for Retained Messages
mqtt:retain Hash for retained messages:
hmset
key = mqtt:retain:${topic}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Lookup retained message:
HGETALL "mqtt:retain:${topic}"
Lookup retained messages with a topic of ‘retain’:
HGETALL mqtt:retain:topic
1) "id"
> - 2) "6P9NLcJ65VXBbC22sYb4"
> 3) "from"
> - 4) "test"
> 5) "qos"
> 6) "1"
> 7) "topic"
> 8) "topic"
> 9) "retain"
> - 10\) "true"
> 11) "payload"
> 12) "Hello world\!"
> 13) "ts"
> 14) "1481690659"
Using Redis Hash for messages
mqtt:msg Hash for MQTT messages:
hmset
key = mqtt:msg:${msgid}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
zadd
key = mqtt:msg:${topic}
field = 1
value = ${msgid}
Using Redis Set for Message Acknowledgements
mqtt:acked SET stores acknowledgements from the clients:
set
key = mqtt:acked:${clientid}:${topic}
value = ${msgid}
Using Redis Hash for Subscription
mqtt:sub Hash for Subscriptions:
hset
key = mqtt:sub:${clientid}
field = ${topic}
value = ${qos}
A client subscribes to a topic:
HSET mqtt:sub:${clientid} ${topic} ${qos}
A client with ClientId of ‘test’ subscribes to topic1 and topic2:
HSET "mqtt:sub:test" "topic1" 1
HSET "mqtt:sub:test" "topic2" 2
Lookup the subscribed topics of client with ClientId of ‘test’:
HGETALL mqtt:sub:test
1) "topic1"
2) "1"
3) "topic2"
4) "2"
Redis SUB/UNSUB Publish
When a device subscribes / unsubscribes to topics, EMQX broker publish an event to the Redis:
PUBLISH
channel = "mqtt_channel"
message = {type: string , topic: string, clientid: string, qos: int}
\*type: [subscribe/unsubscribe]
client with ClientID ‘test’ subscribe to ‘topic0’:
PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Client with ClientId ‘test’ unsubscribes to ‘test_topic0’:
PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
Enable Redis Backend
./bin/emqx_ctl plugins load emqx_backend_redis