DynamoDB 消息存储

提示

EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 DynamoDB规则引擎中创建 保存数据到 DynamoDB

配置 DyanmoDB 消息存储

配置文件: etc/plugins/emqx_backend_dynamo.conf

  1. ## DynamoDB Region
  2. backend.dynamo.region = us-west-2
  3. ## DynamoDB Server
  4. backend.dynamo.pool1.server = http://localhost:8000
  5. ## DynamoDB Pool Size
  6. backend.dynamo.pool1.pool_size = 8
  7. ## AWS ACCESS KEY ID
  8. backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK
  9. ## AWS SECRET ACCESS KEY
  10. backend.dynamo.pool1.aws_secret_access_key = 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
  11. ## DynamoDB Backend Hooks
  12. backend.dynamo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  13. backend.dynamo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  14. backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  15. backend.dynamo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
  16. backend.dynamo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  17. backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
  18. backend.dynamo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  19. backend.dynamo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  20. backend.dynamo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  21. backend.dynamo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
  22. # backend.dynamo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

backend 消息存储规则包括:

hooktopicaction说明
client.connectedon_client_connected存储客户端在线状态
client.connectedon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribed#on_message_fetch_for_queue获取一对一离线消息
session.subscribed#on_retain_lookup获取retain消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储retain消息
message.publish#on_retain_delete删除retain消息
message.acked#on_message_acked_for_queue一对一消息ACK处理

DynamoDB 数据库创建表

  1. ./test/dynamo_test.sh

DynamoDB 用户状态表(Client Table)

mqtt_client 表定义(存储设备在线状态):

  1. {
  2. "TableName": "mqtt_client",
  3. "KeySchema": [
  4. { "AttributeName": "clientid", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "clientid", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

查询设备在线状态:

  1. aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000
  2. {
  3. "Items": [
  4. {
  5. "offline_at": { "N": "0" },
  6. "node": { "S": "emqx@127.0.0.1" },
  7. "clientid": { "S": "mqttjs_384b9c73a9" },
  8. "connect_state": { "N": "1" },
  9. "online_at": { "N": "1562224940" }
  10. }
  11. ],
  12. "Count": 1,
  13. "ScannedCount": 1,
  14. "ConsumedCapacity": null
  15. }

DynamoDB 用户订阅主题(Subscription Table)

mqtt_sub 表定义(存储订阅关系):

  1. {
  2. "TableName": "mqtt_sub",
  3. "KeySchema": [
  4. { "AttributeName": "clientid", "KeyType": "HASH" },
  5. { "AttributeName": "topic", "KeyType": "RANGE" }
  6. ],
  7. "AttributeDefinitions": [
  8. { "AttributeName": "clientid", "AttributeType": "S" },
  9. { "AttributeName": "topic", "AttributeType": "S" }
  10. ],
  11. "ProvisionedThroughput": {
  12. "ReadCapacityUnits": 5,
  13. "WriteCapacityUnits": 5
  14. }
  15. }

查询 ClientId 为 “test-dynamo” 的客户端已订阅主题:

  1. aws dynamodb scan --table-name mqtt_sub --region us-west-2 --endpoint-url http://localhost:8000
  2. {
  3. "Items": [{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub" }, "clientid": { "S": "test-dynamo" }},
  4. {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-1"}, "clientid": { "S": "test-dynamo" }},
  5. {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-2"}, "clientid": { "S": "test-dynamo" }}],
  6. "Count": 3,
  7. "ScannedCount": 3,
  8. "ConsumedCapacity": null
  9. }

DynamoDB 发布消息(Message Table)

mqtt_msg 表定义(存储 MQTT 消息):

  1. {
  2. "TableName": "mqtt_msg",
  3. "KeySchema": [
  4. { "AttributeName": "msgid", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "msgid", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

mqtt_topic_msg_map 表定义(存储主题和消息的映射关系):

  1. {
  2. "TableName": "mqtt_topic_msg_map",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "topic", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

某个客户端向主题 test 发布消息后,查询 mqtt_msg 表和 mqtt_topic_msg_map 表:

查询 mqtt_msg 表:

  1. aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
  2. > - {
  3. > - "Items": \[
  4. > - {
  5. > "arrived": { "N": "1562308553" }, "qos": { "N": "1" },
  6. > "sender": { "S": "mqttjs_231b962d5c" }, "payload": { "S":
  7. > "{ "msg": "Hello, World\!" }"}, "retain": { "N": "0" },
  8. > "msgid": { "S":
  9. > "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
  10. > "topic": { "S": "test" }
  11. > }
  12. > \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
  13. > }

查询 mqtt_topic_msg_map 表:

  1. aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000
  2. > - {
  3. > - "Items": \[
  4. > - {
  5. > "topic": { "S": "test" }, "MsgId": { "SS": \[
  6. > "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" \]}
  7. > }
  8. > \], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null
  9. > }

DynamoDB 保留消息(Retain Message Table)

mqtt_retain 表定义(存储 retain 消息):

  1. {
  2. "TableName": "mqtt_retain",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "topic", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

某个客户端向主题 test 发布消息后,查询 mqtt_retain 表:

  1. {
  2. "Items": [
  3. {
  4. "arrived": { "N": "1562312113" },
  5. "qos": { "N": "1" },
  6. "sender": { "S": "mqttjs_d0513acfce" },
  7. "payload": { "S": "test" },
  8. "retain": { "N": "1" },
  9. "msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
  10. "topic": { "S": "testtopic" }
  11. }
  12. ],
  13. "Count": 1,
  14. "ScannedCount": 1,
  15. "ConsumedCapacity": null
  16. }

DynamoDB 接收消息 ack (Message Acked Table)

mqtt_acked 表定义(存储确认的消息):

  1. {
  2. "TableName": "mqtt_acked",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" },
  5. { "AttributeName": "clientid", "KeyType": "RANGE" }
  6. ],
  7. "AttributeDefinitions": [
  8. { "AttributeName": "topic", "AttributeType": "S" },
  9. { "AttributeName": "clientid", "AttributeType": "S" }
  10. ],
  11. "ProvisionedThroughput": {
  12. "ReadCapacityUnits": 5,
  13. "WriteCapacityUnits": 5
  14. }
  15. }

某个客户端向主题 test 发布消息后,查询 mqtt_acked 表:

  1. {
  2. "Items": [
  3. {
  4. "topic": { "S": "test" },
  5. "msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
  6. "clientid": { "S": "mqttjs_861e582a70" }
  7. }
  8. ],
  9. "Count": 1,
  10. "ScannedCount": 1,
  11. "ConsumedCapacity": null
  12. }

启用 DynamoDB 消息存储:

  1. ./bin/emqx_ctl plugins load emqx_backend_dynamo