DynamoDB 数据存储

本章节以在 CentOS 7.2 中的实际例子来说明如何通过 DynamoDB 来存储相关的信息。

Amazon DynamoDB 是一个完全托管的 NoSQL 数据库服务,支持键值和文档数据结构,可以提供快速的、可预期的性能,并且可以实现无缝扩展。

Amazon DynamoDB 由亚马逊作为 AWS 云产品组合的一部分提供。本处为了方便演示,利用 DynamoDB 可下载版本,在不访问 AWS DynamoDB 服务的情况下完成教程,实际生产环境仍推荐使用 AWS 相关服务。

安装 DynamoDB 服务器

读者可以参考 DynamoDB 官方文档Docker 来下载安装 DynamoDB,本文章使用 DynamoDB 1.11 版本。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ DynamoDB 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_dynamo.conf,如果只是测 DynamoDB 持久化的功能,大部分配置不需要做更改。

本示例依赖 AWS CLI,安装方式请参照 使用捆绑安装程序安装 AWS CLI (Linux, macOS, or Unix)

选用本地测试只需配置 server 参数,使用 DynamoDB 服务则需配置 regionaccess_key_idsecret_access_key 以进行 AWS 服务认证:

  1. ## DynamoDB region
  2. backend.dynamo.region = us-west-2
  3. ## DynamoDB Server
  4. backend.dynamo.pool1.server = http://localhost:8000
  5. ## DynamoDB Pool Size
  6. backend.dynamo.pool1.pool_size = 8
  7. ## AWS ACCESS KEY ID
  8. backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK
  9. ## AWS SECRET ACCESS KEY
  10. backend.dynamo.pool1.aws_secret_access_key = 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
  11. ## 功能配置,按需注释即可屏蔽相关功能
  12. ## DynamoDB Backend Hooks
  13. backend.dynamo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  14. backend.dynamo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  15. backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  16. backend.dynamo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
  17. backend.dynamo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  18. backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
  19. backend.dynamo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  20. backend.dynamo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  21. backend.dynamo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  22. backend.dynamo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}

保持剩下部分的配置文件不变,然后需要启动该插件。启动插件的方式有 命令行控制台 两种方式,读者可以任选其一。

客户端在线状态存储

客户端上下线时,插件将更新在线状态、上下线时间、节点客户端列表到 DynamoDB 数据库。与 MongoDB 不同,DynamoDB 需要手动定义表结构指定主键为 clientid 如下:

  1. {
  2. "TableName": "mqtt_client",
  3. "KeySchema": [
  4. {
  5. "AttributeName": "clientid",
  6. "KeyType": "HASH"
  7. }
  8. ],
  9. "AttributeDefinitions": [
  10. {
  11. "AttributeName": "clientid",
  12. "AttributeType": "S"
  13. }
  14. ],
  15. "ProvisionedThroughput": {
  16. "ReadCapacityUnits": 5,
  17. "WriteCapacityUnits": 5
  18. }
  19. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_client \
  3. --attribute-definitions \
  4. AttributeName=clientid,AttributeType=S \
  5. --key-schema AttributeName=clientid,KeyType=HASH \
  6. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  7. --region us-west-2 --endpoint-url http://localhost:8000

使用 WebSocket 测试工具测试,设备上、下线时插件会往数据库写入/更新设备在线状态,通过以下命令查询:

  1. aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000

查询结果如下,connect_state 字段为在线状态,1 表示在线, 0 表示离线:

  1. {
  2. "Count": 1,
  3. "Items": [
  4. {
  5. "node": {
  6. "S": "emqx@127.0.0.1"
  7. },
  8. "connect_state": {
  9. "N": "1"
  10. },
  11. "online_at": {
  12. "N": "1563765246"
  13. },
  14. "offline_at": {
  15. "N": "0"
  16. },
  17. "clientid": {
  18. "S": "mqttjs_34f653fdcc"
  19. }
  20. }
  21. ],
  22. "ScannedCount": 1,
  23. "ConsumedCapacity": null
  24. }

客户端 Retain 消息存储

客户端发布 Retain 消息时,将 Retain 消息存储到数据库,满足条件的 Topic 被订阅后, Retain 消息将自动发布到客户端。

使用示例

初始化 mqtt_retain 表,用于存储订阅关系:

  1. {
  2. "TableName": "mqtt_retain",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "topic", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_retain \
  3. --attribute-definitions \
  4. AttributeName=topic,AttributeType=S \
  5. --key-schema \
  6. AttributeName=topic,KeyType=HASH \
  7. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  8. --region us-west-2 --endpoint-url http://localhost:8000

使用 WebSocket 测试工具测试,消息发布时勾选 保留 选项, 设备发布消息、订阅信息变更时,插件会从数据库写入/查询 Retain 消息并发布到订阅主题,通过以下命令查询当前 Retain 列表:

  1. aws dynamodb scan --table-name mqtt_retain --region us-west-2 --endpoint-url http://localhost:8000

image-20190722112649536

数据库中存储 Retain 消息如下:

  1. {
  2. "Count": 1,
  3. "Items": [
  4. {
  5. "qos": {
  6. "N": "0"
  7. },
  8. "sender": {
  9. "S": "mqttjs_17137f2af0"
  10. },
  11. "msgid": {
  12. "S": "Mjg4NDYzOTExMTcxNzQ5MTQwMjU1NzgyMDgxNzU1ODczMjJ"
  13. },
  14. "topic": {
  15. "S": "testtopic"
  16. },
  17. "arrived": {
  18. "N": "1563765995"
  19. },
  20. "retain": {
  21. "N": "1"
  22. },
  23. "payload": {
  24. "S": "{ \"msg\": \"Hello, World!\" }"
  25. }
  26. }
  27. ],
  28. "ScannedCount": 1,
  29. "ConsumedCapacity": null
  30. }

客户端订阅关系存储

客户端订阅/取消订阅时,向数据库插入/删除当前订阅关系。

使用示例

初始化 mqtt_sub 表,用于存储设备订阅关系:

  1. {
  2. "TableName": "mqtt_sub",
  3. "KeySchema": [
  4. {
  5. "AttributeName": "clientid",
  6. "KeyType": "HASH"
  7. },
  8. {
  9. "AttributeName": "topic",
  10. "KeyType": "RANGE"
  11. }
  12. ],
  13. "AttributeDefinitions": [
  14. {
  15. "AttributeName": "clientid",
  16. "AttributeType": "S"
  17. },
  18. {
  19. "AttributeName": "topic",
  20. "AttributeType": "S"
  21. }
  22. ],
  23. "ProvisionedThroughput": {
  24. "ReadCapacityUnits": 5,
  25. "WriteCapacityUnits": 5
  26. }
  27. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_sub \
  3. --attribute-definitions \
  4. AttributeName=clientid,AttributeType=S AttributeName=topic,AttributeType=S \
  5. --key-schema \
  6. AttributeName=clientid,KeyType=HASH AttributeName=topic,KeyType=RANGE \
  7. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  8. --region us-west-2 --endpoint-url http://localhost:8000

使用 WebSocket 测试工具测试,设备订阅信息变更时,插件会往数据库写入/更新设备订阅关系,通过以下命令查询:

  1. aws dynamodb scan --table-name mqtt_sub --region us-west-2 --endpoint-url http://localhost:8000

订阅 testtopic 后,查询数据库中存储信息如下:

  1. {
  2. "Items": [
  3. {
  4. "qos": {
  5. "N": "0"
  6. },
  7. "topic": {
  8. "S": "testtopic"
  9. },
  10. "clientid": {
  11. "S": "mqttjs_17137f2af0"
  12. }
  13. }
  14. ],
  15. "Count": 1,
  16. "ScannedCount": 1,
  17. "ConsumedCapacity": null
  18. }

客户端发布消息存储

客户端发布消息时,将消息及消息映射关系存储到数据库。

使用示例

初始化 mqtt_msg 表,用于存储 MQTT 消息:

  1. {
  2. "TableName": "mqtt_msg",
  3. "KeySchema": [
  4. { "AttributeName": "msgid", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "msgid", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_msg \
  3. --attribute-definitions \
  4. AttributeName=msgid,AttributeType=S \
  5. --key-schema \
  6. AttributeName=msgid,KeyType=HASH \
  7. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  8. --region us-west-2 --endpoint-url http://localhost:8000

初始化 mqtt_topic_msg_map 表,用于存储主题和消息的映射关系:

  1. {
  2. "TableName": "mqtt_topic_msg_map",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" }
  5. ],
  6. "AttributeDefinitions": [
  7. { "AttributeName": "topic", "AttributeType": "S" }
  8. ],
  9. "ProvisionedThroughput": {
  10. "ReadCapacityUnits": 5,
  11. "WriteCapacityUnits": 5
  12. }
  13. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_topic_msg_map \
  3. --attribute-definitions \
  4. AttributeName=topic,AttributeType=S \
  5. --key-schema \
  6. AttributeName=topic,KeyType=HASH \
  7. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  8. --region us-west-2 --endpoint-url http://localhost:8000

使用 WebSocket 测试工具测试,消息发布时将写入到以上两个数据集合,查询信息如下:

  1. aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
  2. aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000

数据库中存储消息数据如下:

  1. {
  2. "Count": 1,
  3. "Items": [
  4. {
  5. "qos": {
  6. "N": "1"
  7. },
  8. "sender": {
  9. "S": "mqttjs_17137f2af0"
  10. },
  11. "msgid": {
  12. "S": "Mjg4NDY0MDMyMDA2MzI0MjA4MTcxNDYwNjk0MjQ5MzA4MTJ"
  13. },
  14. "topic": {
  15. "S": "testtopic"
  16. },
  17. "arrived": {
  18. "N": "1563766650"
  19. },
  20. "retain": {
  21. "N": "0"
  22. },
  23. "payload": {
  24. "S": "{ \"msg\": \"Hello, World!\" }"
  25. }
  26. }
  27. ],
  28. "ScannedCount": 1,
  29. "ConsumedCapacity": null
  30. }

数据库中存储消息、订阅关系如下(通过 Message ID 关联主题):

  1. {
  2. "Count": 1,
  3. "Items": [
  4. {
  5. "topic": {
  6. "S": "testtopic"
  7. },
  8. "MsgId": {
  9. "SS": [
  10. "Mjg4NDY0MDMyMDA2MzI0MjA4MTcxNDYwNjk0MjQ5MzA4MTJ"
  11. ]
  12. }
  13. }
  14. ],
  15. "ScannedCount": 1,
  16. "ConsumedCapacity": null
  17. }

客户端接收消息 ACK 存储

客户端接收 QoS > 0 的消息时,将消息存储到数据库。

使用示例

初始化 mqtt_acked 表,用于存储确认的 MQTT 消息:

  1. {
  2. "TableName": "mqtt_acked",
  3. "KeySchema": [
  4. { "AttributeName": "topic", "KeyType": "HASH" },
  5. { "AttributeName": "clientid", "KeyType": "RANGE" }
  6. ],
  7. "AttributeDefinitions": [
  8. { "AttributeName": "topic", "AttributeType": "S" },
  9. { "AttributeName": "clientid", "AttributeType": "S" }
  10. ],
  11. "ProvisionedThroughput": {
  12. "ReadCapacityUnits": 5,
  13. "WriteCapacityUnits": 5
  14. }
  15. }

通过命令行创建数据表:

  1. aws dynamodb create-table \
  2. --table-name mqtt_acked \
  3. --attribute-definitions \
  4. AttributeName=topic,AttributeType=S AttributeName=clientid,AttributeType=S \
  5. --key-schema \
  6. AttributeName=topic,KeyType=HASH AttributeName=clientid,KeyType=RANGE \
  7. --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  8. --region us-west-2 --endpoint-url http://localhost:8000

使用 WebSocket 测试工具测试订阅并发送 QoS 1 的消息,消息发布时将写入到数据库,查询信息如下:

  1. aws dynamodb scan --table-name mqtt_acked --region us-west-2 --endpoint-url http://localhost:8000

数据库中存储消息数据如下:

  1. {
  2. "Count": 1,
  3. "Items": [
  4. {
  5. "topic": {
  6. "S": "testtopic"
  7. },
  8. "msgid": {
  9. "S": "Mjg4NDY1NDAxMjczMzcyMjUwMjExOTc1MzY4MDY4MzAwODB"
  10. },
  11. "clientid": {
  12. "S": "mqttjs_673dd52f4f"
  13. }
  14. }
  15. ],
  16. "ScannedCount": 1,
  17. "ConsumedCapacity": null
  18. }

总结

读者在理解了 DynamoDB 中所存储的数据结构,各个阶段消息存储方式,可以结合 DynamoDB 拓展相关应用。