离线消息保存到 PostgreSQL

搭建 PostgreSQL 数据库,以 MacOS X 为例:

  1. $ brew install postgresql
  2. $ brew services start postgresql

创建 mqtt 数据库:

  1. # 使用用户名 postgres 创建名为 'mqtt' 的数据库
  2. $ createdb -U postgres mqtt
  3. $ psql -U postgres mqtt
  4. mqtt=> \dn;
  5. List of schemas
  6. Name | Owner
  7. --------+-------
  8. public | postgres
  9. (1 row)

创建 mqtt_msg 表:

  1. $ psql -U postgres mqtt
  2. CREATE TABLE mqtt_msg (
  3. id SERIAL8 primary key,
  4. msgid character varying(64),
  5. sender character varying(64),
  6. topic character varying(255),
  7. qos integer,
  8. retain integer,
  9. payload text,
  10. arrived timestamp without time zone
  11. );
  12. CREATE TABLE mqtt_acked (
  13. id SERIAL8 primary key,
  14. clientid character varying(64),
  15. topic character varying(64),
  16. mid integer,
  17. created timestamp without time zone,
  18. UNIQUE (clientid, topic)
  19. );

WARNING

消息表结构不能修改,请使用上面SQL语句创建

创建规则:

打开 EMQ X Dashboard离线消息保存到 PostgreSQL - 图1 (opens new window),选择左侧的“规则”选项卡。

然后填写规则 SQL:

FROM说明

t/#: 发布者发布消息触发保存离线消息到PostgreSQL

$events/session_subscribed: 订阅者订阅主题触发获取离线消息

$events/message_acked: 订阅者回复消息ACK后触发删除已经被接收的离线消息

  1. SELECT * FROM "t/#", "$events/session_subscribed", "$events/message_acked" WHERE topic =~ 't/#'

离线消息保存到 PostgreSQL - 图2

关联动作:

在“响应动作”界面选择“添加动作”,然后在“动作”下拉框里选择“离线消息保存到 PostgreSQL”。

离线消息保存到 PostgreSQL - 图3

现在资源下拉框为空,可以点击右上角的 “新建” 来创建一个 PostgreSQL 资源:

离线消息保存到 PostgreSQL - 图4

弹出一个“创建资源”对话框

离线消息保存到 PostgreSQL - 图5

填写资源配置:

填写真实的 PostgreSQL 服务器地址,其他配置填写相应的值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “确定” 按钮。

离线消息保存到 PostgreSQL - 图6

返回响应动作界面,点击 “确认”。

离线消息保存到 PostgreSQL - 图7

返回规则创建界面,点击 “创建”。

离线消息保存到 PostgreSQL - 图8

规则已经创建完成,通过 Dashboard 的 WebSocket 客户端发一条数据**(发布消息的QoS必须大于0)**:

离线消息保存到 PostgreSQL - 图9

消息发送后,通过 psql 查看到消息被保存到 PostgreSQL 里面:

离线消息保存到 PostgreSQL - 图10

使用另外一个客户端,订阅主题 “t/1” (订阅主题的QoS必须大于0,否则消息会被重复接收):

离线消息保存到 PostgreSQL - 图11

订阅后马上接收到了保存到 PostgreSQL 里面的离线消息:

离线消息保存到 PostgreSQL - 图12

离线消息被接收后会在 PostgreSQL 中删除:

离线消息保存到 PostgreSQL - 图13