从 Redis 中获取订阅关系

搭建 Redis 环境,以 MaxOS X 为例:

  1. $ wget http://download.redis.io/releases/redis-4.0.14.tar.gz
  2. $ tar xzf redis-4.0.14.tar.gz
  3. $ cd redis-4.0.14
  4. $ make && make install
  5. # 启动 redis
  6. $ redis-server

创建规则:

打开 EMQ X Dashboard,选择左侧的 “规则” 选项卡。

然后填写规则 SQL:

  1. SELECT * FROM "$events/client_connected"

从 MySQL 中获取订阅关系 - 图1

关联动作:

在 “响应动作” 界面选择 “添加动作”,然后在 “动作” 下拉框里选择 “从 Redis 中获取订阅关系”。

从 MySQL 中获取订阅关系 - 图2

填写动作参数:

“离线消息保存到 Redis 动作需要一个参数:

1). 关联资源。现在资源下拉框为空,可以点击右上角的 “新建资源” 来创建一个 Redis 资源:

从 MySQL 中获取订阅关系 - 图3

选择 Redis 单节点模式资源”。

从 MySQL 中获取订阅关系 - 图4

填写资源配置:

填写真实的 Redis 服务器地址,其他配置保持默认值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “新建” 按钮。

从 MySQL 中获取订阅关系 - 图5

返回响应动作界面,点击 “确认”。

从 MySQL 中获取订阅关系 - 图6

返回规则创建界面,点击 “新建”。

从 MySQL 中获取订阅关系 - 图7

规则已经创建完成,通过 Redis CLI 往Redis插入一条订阅关系:

  1. HSET mqtt:sub:test t1 1

从 MySQL 中获取订阅关系 - 图8

通过 Dashboard 登录 clientid 为 test 的设备:

从 MySQL 中获取订阅关系 - 图9

查看订阅列表,可以看到 test 设备已经订阅了 t1 主题:

从 MySQL 中获取订阅关系 - 图10

从 MySQL 中获取订阅关系

搭建 MySQL 数据库,并设置用户名密码为 root/public,以 MacOS X 为例:

  1. $ brew install mysql
  2. $ brew services start mysql
  3. $ mysql -u root -h localhost -p
  4. ALTER USER 'root'@'localhost' IDENTIFIED BY 'public';

初始化 MySQL 数据库:

  1. $ mysql -u root -h localhost -ppublic
  2. create database mqtt;

创建 mqtt_sub 表:

  1. DROP TABLE IF EXISTS `mqtt_sub`;
  2. CREATE TABLE `mqtt_sub` (
  3. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  4. `clientid` varchar(64) DEFAULT NULL,
  5. `topic` varchar(180) DEFAULT NULL,
  6. `qos` tinyint(1) DEFAULT NULL,
  7. PRIMARY KEY (`id`),
  8. KEY `mqtt_sub_idx` (`clientid`,`topic`,`qos`),
  9. UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`),
  10. INDEX topic_index(`id`, `topic`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8MB4;

订阅关系表结构不能修改,请使用上面SQL语句创建

创建规则:

打开 EMQ X Dashboard,选择左侧的“规则”选项卡。

然后填写规则 SQL:

  1. SELECT * FROM "$events/client_connected"

从 MySQL 中获取订阅关系 - 图11

关联动作:

在“响应动作”界面选择“添加动作”,然后在“新增动作”下拉框里选择“从MySQL中获取订阅列表”

从 MySQL 中获取订阅关系 - 图12

填写动作参数:

“从 MySQL 中获取订阅列表”动作需要一个参数:

1). 关联资源。现在资源下拉框为空,可以点击右上角的 “新建” 来创建一个 MySQL 资源:

从 MySQL 中获取订阅关系 - 图13

弹出“创建资源”对话框

从 MySQL 中获取订阅关系 - 图14

填写资源配置:

填写真实的 MySQL 服务器地址,其他配置相应的值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “确定” 按钮。

从 MySQL 中获取订阅关系 - 图15

返回响应动作界面,点击 “确认”。

从 MySQL 中获取订阅关系 - 图16

返回规则创建界面,点击 “创建”。

从 MySQL 中获取订阅关系 - 图17

规则已经创建完成,通过 “mysql” 往MySQL插入一条订阅关系:

  1. insert into mqtt_sub(clientid, topic, qos) values("test", "t1", 1);

从 MySQL 中获取订阅关系 - 图18

通过 Dashboard 登录 clientid 为 test 的设备:

从 MySQL 中获取订阅关系 - 图19

查看“订阅”列表,可以看到 Broker 从 MySQL 里面获取到订阅关系,并代理设备订阅:

从 MySQL 中获取订阅关系 - 图20

从 PostgreSQL 中获取订阅关系

搭建 PostgreSQL 数据库,以 MacOS X 为例:

  1. $ brew install postgresql
  2. $ brew services start postgresql

创建 mqtt 数据库:

  1. # 使用用户名 postgres 创建名为 'mqtt' 的数据库
  2. $ createdb -U postgres mqtt
  3. $ psql -U postgres mqtt
  4. mqtt=> \dn;
  5. List of schemas
  6. Name | Owner
  7. --------+-------
  8. public | postgres
  9. (1 row)

创建 mqtt_sub 表:

  1. $ psql -U postgres mqtt
  2. CREATE TABLE mqtt_sub(
  3. id SERIAL8 primary key,
  4. clientid character varying(64),
  5. topic character varying(255),
  6. qos integer,
  7. UNIQUE (clientid, topic)
  8. );

订阅关系表结构不能修改,请使用上面SQL语句创建

创建规则:

打开 EMQ X Dashboard,选择左侧的“规则”选项卡。

然后填写规则 SQL:

  1. SELECT * FROM "$events/client_connected"

从 MySQL 中获取订阅关系 - 图21

关联动作:

在“响应动作”界面选择“添加动作”,然后在“新增动作”下拉框里选择“从PostgreSQL中获取订阅列表”

从 MySQL 中获取订阅关系 - 图22

填写动作参数:

“从PostgreSQL中获取订阅列表“ 动作需要一个参数:

1). 关联资源。现在资源下拉框为空,可以点击右上角的 “新建” 来创建一个 PostgreSQL 资源:

从 MySQL 中获取订阅关系 - 图23

弹出“创建资源”对话框

从 MySQL 中获取订阅关系 - 图24

填写资源配置:

填写真实的 PostgreSQL 服务器地址,其他配置相应的值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “确定” 按钮。

从 MySQL 中获取订阅关系 - 图25

返回响应动作界面,点击 “确认”。

从 MySQL 中获取订阅关系 - 图26

返回规则创建界面,点击 “创建”。

从 MySQL 中获取订阅关系 - 图27

规则已经创建完成,通过 “psql” 往PostgreSQL插入一条订阅关系

  1. insert into mqtt_sub(clientid, topic, qos) values('test', 't1', 1)'

从 MySQL 中获取订阅关系 - 图28

通过 Dashboard 登录 clientid 为 test 的设备:

从 MySQL 中获取订阅关系 - 图29

查看“订阅”列表,可以看到 Broker 从 PostgreSQL 里面获取到订阅关系,并代理设备订阅:

从 MySQL 中获取订阅关系 - 图30

从 Cassandra 中获取订阅关系

搭建 Cassandra 数据库,并设置用户名密码为 root/public,以 MacOS X 为例:

  1. $ brew install cassandra
  2. ## 修改配置,关闭匿名认证
  3. $ vim /usr/local/etc/cassandra/cassandra.yaml
  4. authenticator: PasswordAuthenticator
  5. authorizer: CassandraAuthorizer
  6. $ brew services start cassandra
  7. ## 创建 root 用户
  8. $ cqlsh -ucassandra -pcassandra
  9. create user root with password 'public' superuser;

创建 “mqtt” 表空间:

  1. $ cqlsh -uroot -ppublic
  2. CREATE KEYSPACE mqtt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;

创建 mqtt_sub 表:

  1. CREATE TABLE mqtt_sub (
  2. clientid text,
  3. topic text,
  4. qos int,
  5. PRIMARY KEY (clientid, topic)
  6. ) WITH CLUSTERING ORDER BY (topic ASC)
  7. AND bloom_filter_fp_chance = 0.01
  8. AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
  9. AND comment = ''
  10. AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
  11. AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
  12. AND crc_check_chance = 1.0
  13. AND dclocal_read_repair_chance = 0.1
  14. AND default_time_to_live = 0
  15. AND gc_grace_seconds = 864000
  16. AND max_index_interval = 2048
  17. AND memtable_flush_period_in_ms = 0
  18. AND min_index_interval = 128
  19. AND read_repair_chance = 0.0
  20. AND speculative_retry = '99PERCENTILE';

订阅关系表结构不能修改,请使用上面SQL语句创建

创建规则:

打开 EMQ X Dashboard,选择左侧的“规则”选项卡。

然后填写规则 SQL:

  1. SELECT * FROM "$events/client_connected"

从 MySQL 中获取订阅关系 - 图31

关联动作:

在“响应动作”界面选择“添加动作”,然后在“新增动作”下拉框里选择“从 Cassandra 中获取订阅列表”

从 MySQL 中获取订阅关系 - 图32

填写动作参数:

“从 Cassandra 中获取订阅列表”动作需要一个参数:

1). 关联资源。现在资源下拉框为空,可以点击右上角的 “新建” 来创建一个 Cassandra 资源:

从 MySQL 中获取订阅关系 - 图33

弹出“创建资源”对话框

从 MySQL 中获取订阅关系 - 图34

填写资源配置:

填写真实的 Cassandra 服务器地址,其他配置相应的值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “确定” 按钮。

从 MySQL 中获取订阅关系 - 图35

返回响应动作界面,点击 “确认”。

从 MySQL 中获取订阅关系 - 图36

返回规则创建界面,点击 “创建”。

从 MySQL 中获取订阅关系 - 图37

规则已经创建完成,通过 “cqlsh” 往 Cassandra 插入一条订阅关系:

  1. insert into mqtt_sub(clientid, topic, qos) values('test', 't1', 1);

从 MySQL 中获取订阅关系 - 图38

通过 Dashboard 登录 clientid 为 test 的设备:

从 MySQL 中获取订阅关系 - 图39

查看“订阅”列表,可以看到 Broker 从 Cassandra 里面获取到订阅关系,并代理设备订阅:

从 MySQL 中获取订阅关系 - 图40

从 MongoDB 中获取订阅关系

搭建 MongoDB 数据库,并设置用户名密码为 root/public,以 MacOS X 为例:

  1. $ brew install mongodb
  2. $ brew services start mongodb
  3. ## 新增 root/public 用户
  4. $ use mqtt;
  5. $ db.createUser({user: "root", pwd: "public", roles: [{role: "readWrite", db: "mqtt"}]});
  6. ## 修改配置,关闭匿名认证
  7. $ vi /usr/local/etc/mongod.conf
  8. security:
  9. authorization: enabled
  10. $ brew services restart mongodb

创建 mqtt_sub 表:

  1. $ mongo 127.0.0.1/mqtt -uroot -ppublic
  2. db.createCollection("mqtt_sub");

创建规则:

打开 EMQ X Dashboard,选择左侧的“规则”选项卡。

然后填写规则 SQL:

  1. SELECT * FROM "$events/client_connected"

从 MySQL 中获取订阅关系 - 图41

关联动作:

在“响应动作”界面选择“添加动作”,然后在“新增动作”下拉框里选择“从MongoDB中获取订阅列表”

从 MySQL 中获取订阅关系 - 图42

填写动作参数:

“从 MongoDB 中获取订阅列表”动作需要一个参数:

1). 关联资源。现在资源下拉框为空,可以点击右上角的 “新建” 来创建一个 MongoDB 资源:

从 MySQL 中获取订阅关系 - 图43

弹出“创建资源”对话框

从 MySQL 中获取订阅关系 - 图44

填写资源配置:

填写真实的 MongoDB 服务器地址,其他配置相应的值,然后点击 “测试连接” 按钮,确保连接测试成功。

最后点击 “确定” 按钮。

从 MySQL 中获取订阅关系 - 图45

返回响应动作界面,点击 “确认”。

从 MySQL 中获取订阅关系 - 图46

返回规则创建界面,点击 “创建”。

从 MySQL 中获取订阅关系 - 图47

规则已经创建完成,通过 “mongo” 往MongoDB插入一条订阅关系

  1. db.mqtt_sub.insert({clientid: "test", topic: "t1", qos: 1})

从 MySQL 中获取订阅关系 - 图48

通过 Dashboard 登录 clientid 为 test 的设备:

从 MySQL 中获取订阅关系 - 图49

查看“订阅”列表,可以看到 Broker 从 MongoDB 里面获取到订阅关系,并代理设备订阅:

从 MySQL 中获取订阅关系 - 图50