PostgreSQL 数据存储

配置文件: emqx_backend_pgsql.conf

配置 PostgreSQL 服务器

支持配置多台PostgreSQL服务器连接池:

  1. ## Pgsql 服务器地址
  2. backend.pgsql.pool1.server = 127.0.0.1:5432
  3. ## Pgsql 连接池大小
  4. backend.pgsql.pool1.pool_size = 8
  5. ## Pgsql 用户名
  6. backend.pgsql.pool1.username = root
  7. ## Pgsql 密码
  8. backend.pgsql.pool1.password = public
  9. ## Pgsql 数据库名称
  10. backend.pgsql.pool1.database = mqtt
  11. ## Pgsql Ssl
  12. backend.pgsql.pool1.ssl = false

配置 PostgreSQL 存储规则

  1. backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  2. backend.pgsql.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  3. backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  4. backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
  5. backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  6. backend.pgsql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
  7. backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  8. backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  9. backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  10. backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  11. ## 获取离线消息
  12. ## "offline_opts": 获取离线消息的配置
  13. ## - max_returned_count: 单次拉去的最大离线消息数目
  14. ## - time_range: 仅拉去在当前时间范围的消息
  15. ## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
  16. ## 如果需要存储 Qos0 消息, 可开启以下配置
  17. ## 警告: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次
  18. ## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

PostgreSQL 存储规则说明

hooktopicaction说明
client.connectedon_client_connected存储客户端在线状态
session.createdon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribed#on_message_fetch获取离线消息
session.subscribed#on_retain_lookup获取 retain 消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储 retain 消息
message.publish#on_retain_delete删除 retain 消息
message.acked#on_message_acked消息 ACK 处理

SQL 语句参数说明

hook可用参数示例(sql语句中${name} 表示可获取的参数)
client.connectedclientidinsert into conn(clientid) values(${clientid})
client.disconnectedclientidinsert into disconn(clientid) values(${clientid})
session.subscribedclientid, topic, qosinsert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribedclientid, topicdelete from sub where topic = ${topic}
message.publishmsgid, topic, payload, qos, clientidinsert into msg(msgid, topic) values(${msgid}, ${topic})
message.ackedmsgid, topic, clientidinsert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivermsgid, topic, clientidinsert into deliver(msgid, topic) values(${msgid}, ${topic})

SQL 语句配置 Action

PostgreSQL 存储支持用户采用SQL语句配置 Action,例如:

  1. ## 在客户端连接到 EMQ X 服务器后,执行一条 sql 语句(支持多条sql语句)
  2. backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

创建 PostgreSQL 数据库

  1. createdb mqtt -E UTF8 -e

导入 PostgreSQL 库表结构

  1. \i etc/sql/emqx_backend_pgsql.sql

Tip

数据库名称可自定义

PostgreSQL 设备在线状态表

mqtt_client 存储设备在线状态:

  1. CREATE TABLE mqtt_client(
  2. id SERIAL8 primary key,
  3. clientid character varying(64),
  4. state integer,
  5. node character varying(64),
  6. online_at timestamp ,
  7. offline_at timestamp,
  8. created timestamp without time zone,
  9. UNIQUE (clientid)
  10. );

查询设备在线状态:

  1. select * from mqtt_client where clientid = ${clientid};

例如 ClientId 为 test 客户端上线:

  1. select * from mqtt_client where clientid = 'test';
  2. id | clientid | state | node | online_at | offline_at | created
  3. ----+----------+-------+----------------+---------------------+---------------------+---------------------
  4. 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22
  5. (1 rows)

例如 ClientId 为 test 客户端下线:

  1. select * from mqtt_client where clientid = 'test';
  2. id | clientid | state | nod | online_at | offline_at | created
  3. ----+----------+-------+----------------+---------------------+---------------------+---------------------
  4. 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22
  5. (1 rows)

PostgreSQL 代理订阅表

mqtt_sub 存储订阅关系:

  1. CREATE TABLE mqtt_sub(
  2. id SERIAL8 primary key,
  3. clientid character varying(64),
  4. topic character varying(255),
  5. qos integer,
  6. created timestamp without time zone,
  7. UNIQUE (clientid, topic)
  8. );

例如 ClientId 为 test 客户端订阅主题 test_topic1 test_topic2 :

  1. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
  2. insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);

某个客户端订阅主题:

  1. select * from mqtt_sub where clientid = ${clientid};

查询 ClientId 为 test 的客户端已订阅主题:

  1. select * from mqtt_sub where clientid = 'test';
  2. id | clientId | topic | qos | created
  3. ----+--------------+-------------+------+---------------------
  4. 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05
  5. 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51
  6. (2 rows)

PostgreSQL 消息存储表

mqtt_msg 存储MQTT消息:

  1. CREATE TABLE mqtt_msg (
  2. id SERIAL8 primary key,
  3. msgid character varying(64),
  4. sender character varying(64),
  5. topic character varying(255),
  6. qos integer,
  7. retain integer,
  8. payload text,
  9. arrived timestamp without time zone
  10. );

查询某个客户端发布的消息:

  1. select * from mqtt_msg where sender = ${clientid};

查询 ClientId 为 test 的客户端发布的消息:

  1. select * from mqtt_msg where sender = 'test';
  2. id | msgid | topic | sender | node | qos | retain | payload | arrived
  3. ----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------
  4. 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12
  5. 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45
  6. (2 rows)

PostgreSQL 保留消息表

mqtt_retain 存储 Retain 消息:

  1. CREATE TABLE mqtt_retain(
  2. id SERIAL8 primary key,
  3. topic character varying(255),
  4. msgid character varying(64),
  5. sender character varying(64),
  6. qos integer,
  7. payload text,
  8. arrived timestamp without time zone,
  9. UNIQUE (topic)
  10. );

查询 retain 消息:

  1. select * from mqtt_retain where topic = ${topic};

查询 topic 为 retain 的 retain 消息:

  1. select * from mqtt_retain where topic = 'retain';
  2. id | topic | msgid | sender | node | qos | payload | arrived
  3. ----+----------+-------------------------------+---------+------+------+---------+---------------------
  4. 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18
  5. (1 rows)

PostgreSQL 消息确认表

mqtt_acked 存储客户端消息确认:

  1. CREATE TABLE mqtt_acked (
  2. id SERIAL8 primary key,
  3. clientid character varying(64),
  4. topic character varying(64),
  5. mid integer,
  6. created timestamp without time zone,
  7. UNIQUE (clientid, topic)
  8. );

启用 PostgreSQL 数据存储插件

  1. ./bin/emqx_ctl plugins load emqx_backend_pgsql