扩展插件 (Plugins)

EMQ 消息服务器通过模块注册和钩子(Hooks)机制,支持用户开发扩展插件定制服务器认证鉴权与业务功能。

EMQ 2.0 版本官方提供的插件包括:

插件

说明

emq_dashboard

Web 控制台插件(默认加载)

emq_auth_clientid

ClientId 认证插件

emq_auth_username

用户名、密码认证插件

emq_auth_ldap

LDAP 认证/访问控制

emq_auth_http

HTTP 认证/访问控制

emq_auth_mysql

MySQL 认证/访问控制

emq_auth_pgsql

PostgreSQ L认证/访问控制

emq_auth_redis

Redis 认证/访问控制

emq_web_hook

Web Hook 插件

emq_lua_hook

Lua Hook 插件

emq_auth_mongo

MongoDB 认证/访问控制

emq_modules

扩展模块插件

emq_retainer

Retain 消息存储模块

emq_coap

CoAP 协议支持

emq_sn

MQTT-SN 协议支持

emq_stomp

Stomp 协议支持

emq_sockjs

Stomp/SockJS 协议支持

emq_recon

Recon 性能调试

emq_reloader

Reloader 代码热加载插件

emq_plugin_template

插件开发模版

emq_retainer Retainer 模块插件

2.1-beta 版本将 emq_mod_retainer 模块更名为 emq_retainer 模块,Retainer 模块负责持久化 MQTT Retained 消息。

配置 Retainer 模块

etc/plugins/emq_retainer.conf:

  1. ## disc: disc_copies, ram: ram_copies
  2. ## Notice: retainer's storage_type on each node in a cluster must be the same!
  3. retainer.storage_type = disc
  4. ## Max number of retained messages
  5. retainer.max_message_num = 1000000
  6. ## Max Payload Size of retained message
  7. retainer.max_payload_size = 64KB
  8. ## Expiry interval. Never expired if 0
  9. ## h - hour
  10. ## m - minute
  11. ## s - second
  12. retainer.expiry_interval = 0

加载 Retainer 模块

Retainer 模块默认加载。

emq_auth_clientid - ClientID 认证插件

EMQ 2.0-rc.2 版本将 ClientId 认证模块改为独立插件: https://github.com/emqtt/emq_auth_clientid

ClientID 认证配置

etc/plugins/emq_auth_clientid.conf:

  1. ##auth.client.$N.clientid = clientid
  2. ##auth.client.$N.password = passwd
  3. ## Examples
  4. ##auth.client.1.clientid = id
  5. ##auth.client.1.password = passwd
  6. ##auth.client.2.clientid = dev:devid
  7. ##auth.client.2.password = passwd2
  8. ##auth.client.3.clientid = app:appid
  9. ##auth.client.3.password = passwd3

加载 ClientId 认证插件

  1. ./bin/emqttd_ctl plugins load emq_auth_clientid

emq_auth_username - 用户名密码认证插件

EMQ 2.0-rc.2 版本将用户名认证模块改为独立插件: https://github.com/emqtt/emq_auth_username

用户名认证配置

etc/plugins/emq_auth_username.conf:

  1. ##auth.user.$N.username = admin
  2. ##auth.user.$N.password = public
  3. ## Examples:
  4. ##auth.user.1.username = admin
  5. ##auth.user.1.password = public
  6. ##auth.user.2.username = feng@emqx.io
  7. ##auth.user.2.password = public

两种方式添加用户:

  1. 直接在 etc/plugins/emq_auth_username.conf 中明文配置默认用户例如:

    1. auth.username.test = public
  2. 通过 ‘./bin/emqttd_ctl’ 管理命令行添加用户:

    1. $ ./bin/emqttd_ctl users add <Username> <Password>

加载用户名认证插件

  1. ./bin/emqttd_ctl plugins load emq_auth_username

emq_plugin_template: 插件开发模版

EMQ 插件实际是一个普通的 Erlang 应用,插件配置文件: ‘etc/${PluginName}.conf|config”。

emq_plugin_template 是模版插件,编译发布在 lib/emq_plugin_template-2.0 目录,配置文件: etc/plugins/emq_plugin_templat.config

加载、卸载插件

管理命令行 ‘./bin/emqttd_ctl’ 加载卸载插件。

加载插件:

  1. ./bin/emqttd_ctl plugins load <PluginName>

卸载插件:

  1. ./bin/emqttd_ctl plugins unload <PluginName>

查询插件:

  1. ./bin/emqttd_ctl plugins list

emq_dashboard: Dashboard 插件

EMQ 消息服务器的 Web 管理控制台。插件项目地址: https://github.com/emqtt/emqttd_dashboard

EMQ 消息服务器默认加载 Dashboard 插件。URL 地址: http://localhost:18083 ,缺省用户名/密码: admin/public。

Dashboard 插件可查询 EMQ 消息服务器基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)。

_images/dashboard.png

Dashboard 插件设置

etc/plugins/emq_dashboard.conf:

  1. ## HTTP Listener
  2. dashboard.listener.http = 18083
  3. dashboard.listener.http.acceptors = 2
  4. dashboard.listener.http.max_clients = 512
  5. ## HTTPS Listener
  6. ## dashboard.listener.https = 18084
  7. ## dashboard.listener.https.acceptors = 2
  8. ## dashboard.listener.https.max_clients = 512
  9. ## dashboard.listener.https.handshake_timeout = 15s
  10. ## dashboard.listener.https.certfile = etc/certs/cert.pem
  11. ## dashboard.listener.https.keyfile = etc/certs/key.pem
  12. ## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
  13. ## dashboard.listener.https.verify = verify_peer
  14. ## dashboard.listener.https.fail_if_no_peer_cert = true

emq_auth_ldap: LDAP 认证插件

LDAP 认证插件: https://github.com/emqtt/emq_auth_ldap

注解

2.0-beta1 版本支持

LDAP 认证插件配置

etc/plugins/emq_auth_ldap.conf:

  1. auth.ldap.servers = 127.0.0.1
  2. auth.ldap.port = 389
  3. auth.ldap.timeout = 30
  4. auth.ldap.user_dn = uid=%u,ou=People,dc=example,dc=com
  5. auth.ldap.ssl = false

LDAP 认证插件加载

./bin/emqttd_ctl plugins load emq_auth_ldap

emq_auth_http: HTTP 认证/访问控制插件

HTTP 认证/访问控制插件: https://github.com/emqtt/emq_auth_http

注解

1.1版本支持

HTTP 认证插件配置

etc/plugins/emq_auth_http.conf:

  1. ## Variables: %u = username, %c = clientid, %a = ipaddress, %P = password, %t = topic
  2. auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
  3. auth.http.auth_req.method = post
  4. auth.http.auth_req.params = clientid=%c,username=%u,password=%P
  5. auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
  6. auth.http.super_req.method = post
  7. auth.http.super_req.params = clientid=%c,username=%u
  8. ## 'access' parameter: sub = 1, pub = 2
  9. auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
  10. auth.http.acl_req.method = get
  11. auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

HTTP 认证/鉴权 API

认证/ACL 成功,API 返回200

认证/ACL 失败,API 返回4xx

加载 HTTP 认证插件

./bin/emqttd_ctl plugins load emq_auth_http

emq_auth_mysql: MySQL 认证/访问控制插件

MySQL 认证/访问控制插件,基于 MySQL 库表认证鉴权: https://github.com/emqtt/emq-auth-mysql

MQTT 用户表

  1. CREATE TABLE `mqtt_user` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `username` varchar(100) DEFAULT NULL,
  4. `password` varchar(100) DEFAULT NULL,
  5. `salt` varchar(35) DEFAULT NULL,
  6. `is_superuser` tinyint(1) DEFAULT 0,
  7. `created` datetime DEFAULT NULL,
  8. PRIMARY KEY (`id`),
  9. UNIQUE KEY `mqtt_username` (`username`)
  10. ) ENGINE=MyISAM DEFAULT CHARSET=utf8;

注解

MySQL 插件可使用系统自有的用户表,通过 ‘authquery’ 配置查询语句。

MQTT 访问控制表

  1. CREATE TABLE `mqtt_acl` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
  4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  11. INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
  12. VALUES
  13. (1,1,NULL,'$all',NULL,2,'#'),
  14. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  15. (3,0,NULL,'$all',NULL,1,'eq #'),
  16. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  17. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  18. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置 MySQL 认证鉴权插件

etc/plugins/emq_auth_mysql.conf:

  1. ## Mysql Server
  2. auth.mysql.server = 127.0.0.1:3306
  3. ## Mysql Pool Size
  4. auth.mysql.pool = 8
  5. ## Mysql Username
  6. ## auth.mysql.username =
  7. ## Mysql Password
  8. ## auth.mysql.password =
  9. ## Mysql Database
  10. auth.mysql.database = mqtt
  11. ## Variables: %u = username, %c = clientid
  12. ## Authentication Query: select password only
  13. auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
  14. ## Password hash: plain, md5, sha, sha256, pbkdf2
  15. auth.mysql.password_hash = sha256
  16. ## %% Superuser Query
  17. auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
  18. ## ACL Query Command
  19. auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

加载 MySQL 认证鉴权插件

./bin/emqttd_ctl plugins load emq_auth_mysql

emq_auth_pgsql: Postgre 认证/访问控制插件

Postgre 认证/访问控制插件,基于 PostgreSQL 库表认证鉴权: https://github.com/emqtt/emqttd_plugin_pgsql

Postgre MQTT 用户表

  1. CREATE TABLE mqtt_user (
  2. id SERIAL primary key,
  3. is_superuser boolean,
  4. username character varying(100),
  5. password character varying(100),
  6. salt character varying(40)
  7. );

Postgre MQTT 访问控制表

  1. CREATE TABLE mqtt_acl (
  2. id SERIAL primary key,
  3. allow integer,
  4. ipaddr character varying(60),
  5. username character varying(100),
  6. clientid character varying(100),
  7. access integer,
  8. topic character varying(100)
  9. );
  10. INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
  11. VALUES
  12. (1,1,NULL,'$all',NULL,2,'#'),
  13. (2,0,NULL,'$all',NULL,1,'$SYS/#'),
  14. (3,0,NULL,'$all',NULL,1,'eq #'),
  15. (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
  16. (6,1,'127.0.0.1',NULL,NULL,2,'#'),
  17. (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

配置 Postgre 认证鉴权插件

etc/plugins/emq_auth_pgsql.conf:

  1. ## Postgre Server
  2. auth.pgsql.server = 127.0.0.1:5432
  3. auth.pgsql.pool = 8
  4. auth.pgsql.username = root
  5. #auth.pgsql.password =
  6. auth.pgsql.database = mqtt
  7. auth.pgsql.encoding = utf8
  8. auth.pgsql.ssl = false
  9. ## Variables: %u = username, %c = clientid, %a = ipaddress
  10. ## Authentication Query: select password only
  11. auth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1
  12. ## Password hash: plain, md5, sha, sha256, pbkdf2
  13. auth.pgsql.password_hash = sha256
  14. ## sha256 with salt prefix
  15. ## auth.pgsql.password_hash = salt sha256
  16. ## sha256 with salt suffix
  17. ## auth.pgsql.password_hash = sha256 salt
  18. ## Superuser Query
  19. auth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
  20. ## ACL Query. Comment this query, the acl will be disabled.
  21. auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

加载 Postgre 认证鉴权插件

  1. ./bin/emqttd_ctl plugins load emq_auth_pgsql

emq_auth_redis: Redis 认证/访问控制插件

基于 Redis 认证/访问控制: https://github.com/emqtt/emqttd_plugin_redis

配置 Redis 认证鉴权插件

etc/plugins/emq_auth_redis.conf:

  1. ## Redis Server
  2. auth.redis.server = 127.0.0.1:6379
  3. ## Redis Pool Size
  4. auth.redis.pool = 8
  5. ## Redis Database
  6. auth.redis.database = 0
  7. ## Redis Password
  8. ## auth.redis.password =
  9. ## Variables: %u = username, %c = clientid
  10. ## Authentication Query Command
  11. auth.redis.auth_cmd = HGET mqtt_user:%u password
  12. ## Password hash: plain, md5, sha, sha256, pbkdf2
  13. auth.redis.password_hash = sha256
  14. ## Superuser Query Command
  15. auth.redis.super_cmd = HGET mqtt_user:%u is_superuser
  16. ## ACL Query Command
  17. auth.redis.acl_cmd = HGETALL mqtt_acl:%u

Redis 用户 Hash

默认基于用户 Hash 认证:

  1. HSET mqtt_user:<username> is_superuser 1
  2. HSET mqtt_user:<username> password "passwd"

Redis ACL 规则 Hash

默认采用 Hash 存储 ACL 规则:

  1. HSET mqtt_acl:<username> topic1 1
  2. HSET mqtt_acl:<username> topic2 2
  3. HSET mqtt_acl:<username> topic3 3

注解

1: subscribe, 2: publish, 3: pubsub

Redis 订阅 Hash

插件还支持 Redis 中创建 MQTT 订阅。当 MQTT 客户端连接成功,会自动从 Redis 加载订阅:

  1. HSET mqtt_sub:<username> topic1 0
  2. HSET mqtt_sub:<username> topic2 1
  3. HSET mqtt_sub:<username> topic3 2

警告

2.0-rc.2 版本已将订阅加载迁移至 EMQX 产品的emqx_backend_redis插件。

加载 Redis 认证鉴权插件

  1. ./bin/emqttd_ctl plugins load emq_auth_redis

emq_auth_mongo: MongoDB 认证/访问控制插件

基于 MongoDB 认证/访问控制: https://github.com/emqtt/emqttd_plugin_mongo

配置 MongoDB 认证鉴权插件

etc/plugins/emq_auth_mongo.conf:

  1. ## Mongo Server
  2. auth.mongo.server = 127.0.0.1:27017
  3. ## Mongo Pool Size
  4. auth.mongo.pool = 8
  5. ## Mongo User
  6. ## auth.mongo.user =
  7. ## Mongo Password
  8. ## auth.mongo.password =
  9. ## Mongo Database
  10. auth.mongo.database = mqtt
  11. ## auth_query
  12. auth.mongo.auth_query.collection = mqtt_user
  13. auth.mongo.auth_query.password_field = password
  14. auth.mongo.auth_query.password_hash = sha256
  15. auth.mongo.auth_query.selector = username=%u
  16. ## super_query
  17. auth.mongo.super_query.collection = mqtt_user
  18. auth.mongo.super_query.super_field = is_superuser
  19. auth.mongo.super_query.selector = username=%u
  20. ## acl_query
  21. auth.mongo.acl_query.collection = mqtt_user
  22. auth.mongo.acl_query.selector = username=%u

MongoDB 数据库

  1. use mqtt
  2. db.createCollection("mqtt_user")
  3. db.createCollection("mqtt_acl")
  4. db.mqtt_user.ensureIndex({"username":1})

注解

数据库、集合名称可自定义

MongoDB 用户集合(User Collection)

  1. {
  2. username: "user",
  3. password: "password hash",
  4. is_superuser: boolean (true, false),
  5. created: "datetime"
  6. }

示例:

  1. db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
  2. db.mqtt_user:insert({username: "root", is_superuser: true})

MongoDB ACL 集合(ACL Collection)

  1. {
  2. username: "username",
  3. clientid: "clientid",
  4. publish: ["topic1", "topic2", ...],
  5. subscribe: ["subtop1", "subtop2", ...],
  6. pubsub: ["topic/#", "topic1", ...]
  7. }

示例:

  1. db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
  2. db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

加载 Mognodb 认证插件

  1. ./bin/emqttd_ctl plugins load emq_auth_mongo

emq_modules 扩展模块插件

2.1 版本将全部扩展模块项目(emq_mod_presence, emq_mod_subscription, emq_mod_rewrite)合并为一个 emq_modules 项目。

配置 Modules 插件

  1. ##--------------------------------------------------------------------
  2. ## Presence Module
  3. ##--------------------------------------------------------------------
  4. ## Enable Presence, Values: on | off
  5. module.presence = on
  6. module.presence.qos = 1
  7. ##--------------------------------------------------------------------
  8. ## Subscription Module
  9. ##--------------------------------------------------------------------
  10. ## Enable Subscription, Values: on | off
  11. module.subscription = on
  12. ## Subscribe the Topics automatically when client connected
  13. module.subscription.1.topic = $client/%c
  14. ## Qos of the subscription: 0 | 1 | 2
  15. module.subscription.1.qos = 1
  16. ## module.subscription.2.topic = $user/%u
  17. ## module.subscription.2.qos = 1
  18. ##--------------------------------------------------------------------
  19. ## Rewrite Module
  20. ##--------------------------------------------------------------------
  21. ## Enable Rewrite, Values: on | off
  22. module.rewrite = off
  23. ## {rewrite, Topic, Re, Dest}
  24. ## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
  25. ## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2

加载 Modules 插件

Modules 插件默认加载。

emq_mod_presence Presence 模块插件

2.0-rc.3 版本将 Presence 模块改为独立插件,Presence 模块会向 $SYS 主题(Topic)发布客户端上下线消息。

警告

2.1 版本该插件已并入 emq_modules 项目

配置 Presence 模块

etc/plugins/emq_mod_presence.conf:

  1. ## Enable presence module
  2. ## Values: on | off
  3. module.presence = on
  4. module.presence.qos = 0

加载 Presence 模块

Presence 模块默认加载。

emq_mod_subscription 自动订阅模块插件

2.0-rc.3 版本将 Subscription 模块改为独立插件,Subscription 扩展模块支持客户端上线时,自动订阅或恢复订阅某些主题(Topic)。

警告

2.1 版本该插件已并入 emq_modules 项目

配置 Subscription 模块

etc/plugins/emq_mod_subscription.conf:

警告

2.1 版本该插件已并入 emq_modules 项目

  1. ## Subscribe the Topics automatically when client connected
  2. module.subscription.1.topic = $client/%c
  3. ## Qos of the subscription: 0 | 1 | 2
  4. module.subscription.1.qos = 1
  5. ##module.subscription.2.topic = $user/%u
  6. ##module.subscription.2.qos = 1
  7. ## Load static subscriptions from backend storage
  8. ## Values: on | off
  9. module.subscription.backend = on

加载 Subscription 模块

Subscription 模块默认加载。

emq_mod_rewrite 主题重写插件

2.0-rc.2 版本将 rewrite 模块改为独立插件,rewrite 插件支持重写发布订阅的主题(Topic)。

警告

2.1版本该插件已并入 emq_modules 项目

配置 Rewrite 插件

etc/plugins/emq_mod_rewrite.conf:

  1. [
  2. {emq_mod_rewrite, [
  3. {rules, [
  4. %% {rewrite, Topic, Re, Dest}
  5. %% Example: x/y/ -> z/y/
  6. %% {rewrite, "x/#", "^x/y/(.+)$", "z/y/$1"},
  7. %% {rewrite, "y/+/z/#", "^y/(.+)/z/(.+)$", "y/z/$2"}
  8. ]}
  9. ]}
  10. ].

加载 Rewrite 插件

  1. ./bin/emqttd_ctl plugins load emq_mod_rewrite

emq_coap: CoAP 协议插件

CoAP 协议插件,支持 RFC 7252 规范。

配置 CoAP 协议插件

  1. coap.server = 5683
  2. coap.prefix.mqtt = mqtt
  3. coap.handler.mqtt = emq_coap_gateway

加载 CoAP 协议插件

  1. ./bin/emqttd_ctl plugins load emq_coap

libcoap 客户端

  1. yum install libcoap
  2. % coap client publish message
  3. coap-client -m post -e "qos=0&retain=0&message=payload&topic=hello" coap://localhost/mqtt

emq_sn: MQTT-SN 协议插件

MQTT-SN 协议插件,支持 MQTT-SN 网关模式。

配置 MQTT-SN 协议插件

注解

默认 MQTT-SN 协议 UDP 端口: 1884

etc/plugins/emq_sn.conf:

  1. mqtt.sn.port = 1884

加载 MQTT-SN 协议插件

  1. ./bin/emqttd_ctl plugins load emq_sn

emq_stomp: Stomp 协议插件

Stomp 协议插件。支持 STOMP 1.0/1.1/1.2 协议客户端连接 EMQ,发布订阅 MQTT 消息。

配置插件

注解

Stomp 协议端口: 61613

etc/plugins/emq_stomp.conf:

  1. stomp.default_user.login = guest
  2. stomp.default_user.passcode = guest
  3. stomp.allow_anonymous = true
  4. stomp.frame.max_headers = 10
  5. stomp.frame.max_header_length = 1024
  6. stomp.frame.max_body_length = 8192
  7. stomp.listener = 61613
  8. stomp.listener.acceptors = 4
  9. stomp.listener.max_clients = 512

加载 Stomp 插件

  1. ./bin/emqttd_ctl plugins load emq_stomp

emq_sockjs: Stomp/Sockjs 插件

警告

2.0 版本不再维护 SockJS 插件

配置 SockJS 插件

etc/plugins/emq_sockjs.config:

注解

缺省端口: 61616

  1. [
  2. {emq_sockjs, [
  3. {sockjs, []},
  4. {cowboy_listener, {stomp_sockjs, 61616, 4}},
  5. %% TODO: unused...
  6. {stomp, [
  7. {frame, [
  8. {max_headers, 10},
  9. {max_header_length, 1024},
  10. {max_body_length, 8192}
  11. ]}
  12. ]}
  13. ]}
  14. ].

加载 SockJS 插件

  1. ./bin/emqttd_ctl plugins load emq_sockjs

插件演示页面

http://localhost:61616/index.html

emq_recon: Recon 性能调试插件

emq_recon 插件集成 recon 性能调测库,’./bin/emqttd_ctl’ 命令行注册 recon 命令。

配置 Recon 插件

etc/plugins/emq_recon.conf:

  1. %% Garbage Collection: 10 minutes
  2. recon.gc_interval = 600

加载 Recon 插件

  1. ./bin/emqttd_ctl plugins load emq_recon

recon 插件命令

  1. ./bin/emqttd_ctl recon
  2. recon memory #recon_alloc:memory/2
  3. recon allocated #recon_alloc:memory(allocated_types, current|max)
  4. recon bin_leak #recon:bin_leak(100)
  5. recon node_stats #recon:node_stats(10, 1000)
  6. recon remote_load Mod #recon:remote_load(Mod)

emq_reloader: 代码热加载插件

用于开发调试的代码热升级插件。加载该插件后,EMQ 会自动热升级更新代码。

注解

产品部署环境不建议使用该插件

配置 Reloader 插件

etc/plugins/emq_reloader.conf:

  1. reloader.interval = 60
  2. reloader.logfile = log/reloader.log

加载 Reloader 插件

  1. ./bin/emqttd_ctl plugins load emq_reloader

Reloader 插件命令

  1. ./bin/emqttd_ctl reload
  2. reload <Module> # Reload a Module

EMQ 2.0 插件开发

创建插件项目

参考 emq_plugin_template 插件模版创建新的插件项目。

注册认证/访问控制模块

认证演示模块 - emq_auth_demo.erl

  1. -module(emq_auth_demo).
  2. -behaviour(emqttd_auth_mod).
  3. -include_lib("emqttd/include/emqttd.hrl").
  4. -export([init/1, check/3, description/0]).
  5. init(Opts) -> {ok, Opts}.
  6. check(#mqtt_client{client_id = ClientId, username = Username}, Password, _Opts) ->
  7. io:format("Auth Demo: clientId=~p, username=~p, password=~p~n",
  8. [ClientId, Username, Password]),
  9. ok.
  10. description() -> "Demo Auth Module".

访问控制演示模块 - emqttd_acl_demo.erl

  1. -module(emq_acl_demo).
  2. -include_lib("emqttd/include/emqttd.hrl").
  3. %% ACL callbacks
  4. -export([init/1, check_acl/2, reload_acl/1, description/0]).
  5. init(Opts) ->
  6. {ok, Opts}.
  7. check_acl({Client, PubSub, Topic}, Opts) ->
  8. io:format("ACL Demo: ~p ~p ~p~n", [Client, PubSub, Topic]),
  9. allow.
  10. reload_acl(_Opts) ->
  11. ok.
  12. description() -> "ACL Module Demo".

注册认证、访问控制模块 - emq_plugin_template_app.erl

  1. ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
  2. ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),

注册扩展钩子(Hooks)

通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。

emq_plugin_template.erl:

  1. %% Called when the plugin application start
  2. load(Env) ->
  3. emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
  4. emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
  5. emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
  6. emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
  7. emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
  8. emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribe/4, [Env]),
  9. emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
  10. emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
  11. emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

扩展钩子(Hook):

钩子

说明

client.connected

客户端上线

client.subscribe

客户端订阅主题前

session.subscribed

客户端订阅主题后

client.unsubscribe

客户端取消订阅主题

session.unsubscribed

客户端取消订阅主题后

message.publish

MQTT 消息发布

message.delivered

MQTT 消息送达

message.acked

MQTT 消息回执

client.disconnected

客户端连接断开

注册扩展命令行

扩展命令行演示模块 - emq_cli_demo.erl

  1. -module(emq_cli_demo).
  2. -include_lib("emqttd/include/emqttd_cli.hrl").
  3. -export([cmd/1]).
  4. cmd(["arg1", "arg2"]) ->
  5. ?PRINT_MSG("ok");
  6. cmd(_) ->
  7. ?USAGE([{"cmd arg1 arg2", "cmd demo"}]).

注册命令行模块 - emq_plugin_template_app.erl

  1. emqttd_ctl:register_cmd(cmd, {emq_cli_demo, cmd}, []).

插件加载后,’./bin/emqttd_ctl’新增命令行:

  1. ./bin/emqttd_ctl cmd arg1 arg2

插件配置文件

插件自带配置文件放置在 etc/${plugin_name}.conf|config,EMQ 支持两种插件配置格式:

  1. ${plugin_name}.config,Erlang 原生配置文件格式:
  1. [
  2. {plugin_name, [
  3. {key, value}
  4. ]}
  5. ].
  1. ${plugin_name}.conf, sysctl 的 k = v 通用格式:
  1. plugin_name.key = value

注解

k = v 格式配置需要插件开发者创建 priv/plugin_name.schema 映射文件。

编译发布插件

  1. clone emq-relx 项目:
  1. git clone https://github.com/emqtt/emq-relx.git
  1. Makefile 增加 DEPS:
  1. DEPS += plugin_name
  2. dep_plugin_name = git url_of_plugin
  1. relx.config 中 release 段落添加:
  1. {plugin_name, load},