- 扩展插件 (Plugins)
- emq_retainer Retainer 模块插件
- emq_auth_clientid - ClientID 认证插件
- emq_auth_username - 用户名密码认证插件
- emq_plugin_template: 插件开发模版
- emq_dashboard: Dashboard 插件
- emq_auth_ldap: LDAP 认证插件
- emq_auth_http: HTTP 认证/访问控制插件
- emq_auth_mysql: MySQL 认证/访问控制插件
- emq_auth_pgsql: Postgre 认证/访问控制插件
- emq_auth_redis: Redis 认证/访问控制插件
- emq_auth_mongo: MongoDB 认证/访问控制插件
- emq_modules 扩展模块插件
- emq_mod_presence Presence 模块插件
- emq_mod_subscription 自动订阅模块插件
- emq_mod_rewrite 主题重写插件
- emq_coap: CoAP 协议插件
- emq_sn: MQTT-SN 协议插件
- emq_stomp: Stomp 协议插件
- emq_sockjs: Stomp/Sockjs 插件
- emq_recon: Recon 性能调试插件
- emq_reloader: 代码热加载插件
- EMQ 2.0 插件开发
扩展插件 (Plugins)
EMQ 消息服务器通过模块注册和钩子(Hooks)机制,支持用户开发扩展插件定制服务器认证鉴权与业务功能。
EMQ 2.0 版本官方提供的插件包括:
插件 | 说明 |
---|---|
Web 控制台插件(默认加载) | |
ClientId 认证插件 | |
用户名、密码认证插件 | |
LDAP 认证/访问控制 | |
HTTP 认证/访问控制 | |
MySQL 认证/访问控制 | |
PostgreSQ L认证/访问控制 | |
Redis 认证/访问控制 | |
Web Hook 插件 | |
Lua Hook 插件 | |
MongoDB 认证/访问控制 | |
扩展模块插件 | |
Retain 消息存储模块 | |
CoAP 协议支持 | |
MQTT-SN 协议支持 | |
Stomp 协议支持 | |
Stomp/SockJS 协议支持 | |
Recon 性能调试 | |
Reloader 代码热加载插件 | |
插件开发模版 |
emq_retainer Retainer 模块插件
2.1-beta 版本将 emq_mod_retainer 模块更名为 emq_retainer 模块,Retainer 模块负责持久化 MQTT Retained 消息。
配置 Retainer 模块
etc/plugins/emq_retainer.conf:
## disc: disc_copies, ram: ram_copies
## Notice: retainer's storage_type on each node in a cluster must be the same!
retainer.storage_type = disc
## Max number of retained messages
retainer.max_message_num = 1000000
## Max Payload Size of retained message
retainer.max_payload_size = 64KB
## Expiry interval. Never expired if 0
## h - hour
## m - minute
## s - second
retainer.expiry_interval = 0
加载 Retainer 模块
Retainer 模块默认加载。
emq_auth_clientid - ClientID 认证插件
EMQ 2.0-rc.2 版本将 ClientId 认证模块改为独立插件: https://github.com/emqtt/emq_auth_clientid
ClientID 认证配置
etc/plugins/emq_auth_clientid.conf:
##auth.client.$N.clientid = clientid
##auth.client.$N.password = passwd
## Examples
##auth.client.1.clientid = id
##auth.client.1.password = passwd
##auth.client.2.clientid = dev:devid
##auth.client.2.password = passwd2
##auth.client.3.clientid = app:appid
##auth.client.3.password = passwd3
加载 ClientId 认证插件
./bin/emqttd_ctl plugins load emq_auth_clientid
emq_auth_username - 用户名密码认证插件
EMQ 2.0-rc.2 版本将用户名认证模块改为独立插件: https://github.com/emqtt/emq_auth_username
用户名认证配置
etc/plugins/emq_auth_username.conf:
##auth.user.$N.username = admin
##auth.user.$N.password = public
## Examples:
##auth.user.1.username = admin
##auth.user.1.password = public
##auth.user.2.username = feng@emqx.io
##auth.user.2.password = public
两种方式添加用户:
直接在 etc/plugins/emq_auth_username.conf 中明文配置默认用户例如:
auth.username.test = public
通过 ‘./bin/emqttd_ctl’ 管理命令行添加用户:
$ ./bin/emqttd_ctl users add <Username> <Password>
加载用户名认证插件
./bin/emqttd_ctl plugins load emq_auth_username
emq_plugin_template: 插件开发模版
EMQ 插件实际是一个普通的 Erlang 应用,插件配置文件: ‘etc/${PluginName}.conf|config”。
emq_plugin_template 是模版插件,编译发布在 lib/emq_plugin_template-2.0 目录,配置文件: etc/plugins/emq_plugin_templat.config
加载、卸载插件
管理命令行 ‘./bin/emqttd_ctl’ 加载卸载插件。
加载插件:
./bin/emqttd_ctl plugins load <PluginName>
卸载插件:
./bin/emqttd_ctl plugins unload <PluginName>
查询插件:
./bin/emqttd_ctl plugins list
emq_dashboard: Dashboard 插件
EMQ 消息服务器的 Web 管理控制台。插件项目地址: https://github.com/emqtt/emqttd_dashboard
EMQ 消息服务器默认加载 Dashboard 插件。URL 地址: http://localhost:18083 ,缺省用户名/密码: admin/public。
Dashboard 插件可查询 EMQ 消息服务器基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)。
Dashboard 插件设置
etc/plugins/emq_dashboard.conf:
## HTTP Listener
dashboard.listener.http = 18083
dashboard.listener.http.acceptors = 2
dashboard.listener.http.max_clients = 512
## HTTPS Listener
## dashboard.listener.https = 18084
## dashboard.listener.https.acceptors = 2
## dashboard.listener.https.max_clients = 512
## dashboard.listener.https.handshake_timeout = 15s
## dashboard.listener.https.certfile = etc/certs/cert.pem
## dashboard.listener.https.keyfile = etc/certs/key.pem
## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
## dashboard.listener.https.verify = verify_peer
## dashboard.listener.https.fail_if_no_peer_cert = true
emq_auth_ldap: LDAP 认证插件
LDAP 认证插件: https://github.com/emqtt/emq_auth_ldap
注解
2.0-beta1 版本支持
LDAP 认证插件配置
etc/plugins/emq_auth_ldap.conf:
auth.ldap.servers = 127.0.0.1
auth.ldap.port = 389
auth.ldap.timeout = 30
auth.ldap.user_dn = uid=%u,ou=People,dc=example,dc=com
auth.ldap.ssl = false
LDAP 认证插件加载
./bin/emqttd_ctl plugins load emq_auth_ldap
emq_auth_http: HTTP 认证/访问控制插件
HTTP 认证/访问控制插件: https://github.com/emqtt/emq_auth_http
注解
1.1版本支持
HTTP 认证插件配置
etc/plugins/emq_auth_http.conf:
## Variables: %u = username, %c = clientid, %a = ipaddress, %P = password, %t = topic
auth.http.auth_req = http://127.0.0.1:8080/mqtt/auth
auth.http.auth_req.method = post
auth.http.auth_req.params = clientid=%c,username=%u,password=%P
auth.http.super_req = http://127.0.0.1:8080/mqtt/superuser
auth.http.super_req.method = post
auth.http.super_req.params = clientid=%c,username=%u
## 'access' parameter: sub = 1, pub = 2
auth.http.acl_req = http://127.0.0.1:8080/mqtt/acl
auth.http.acl_req.method = get
auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
HTTP 认证/鉴权 API
认证/ACL 成功,API 返回200
认证/ACL 失败,API 返回4xx
加载 HTTP 认证插件
./bin/emqttd_ctl plugins load emq_auth_http
emq_auth_mysql: MySQL 认证/访问控制插件
MySQL 认证/访问控制插件,基于 MySQL 库表认证鉴权: https://github.com/emqtt/emq-auth-mysql
MQTT 用户表
CREATE TABLE `mqtt_user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(100) DEFAULT NULL,
`password` varchar(100) DEFAULT NULL,
`salt` varchar(35) DEFAULT NULL,
`is_superuser` tinyint(1) DEFAULT 0,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
注解
MySQL 插件可使用系统自有的用户表,通过 ‘authquery’ 配置查询语句。
MQTT 访问控制表
CREATE TABLE `mqtt_acl` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
`ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
`username` varchar(100) DEFAULT NULL COMMENT 'Username',
`clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
`access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
`topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
VALUES
(1,1,NULL,'$all',NULL,2,'#'),
(2,0,NULL,'$all',NULL,1,'$SYS/#'),
(3,0,NULL,'$all',NULL,1,'eq #'),
(5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
(6,1,'127.0.0.1',NULL,NULL,2,'#'),
(7,1,NULL,'dashboard',NULL,1,'$SYS/#');
配置 MySQL 认证鉴权插件
etc/plugins/emq_auth_mysql.conf:
## Mysql Server
auth.mysql.server = 127.0.0.1:3306
## Mysql Pool Size
auth.mysql.pool = 8
## Mysql Username
## auth.mysql.username =
## Mysql Password
## auth.mysql.password =
## Mysql Database
auth.mysql.database = mqtt
## Variables: %u = username, %c = clientid
## Authentication Query: select password only
auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
## Password hash: plain, md5, sha, sha256, pbkdf2
auth.mysql.password_hash = sha256
## %% Superuser Query
auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
## ACL Query Command
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'
加载 MySQL 认证鉴权插件
./bin/emqttd_ctl plugins load emq_auth_mysql
emq_auth_pgsql: Postgre 认证/访问控制插件
Postgre 认证/访问控制插件,基于 PostgreSQL 库表认证鉴权: https://github.com/emqtt/emqttd_plugin_pgsql
Postgre MQTT 用户表
CREATE TABLE mqtt_user (
id SERIAL primary key,
is_superuser boolean,
username character varying(100),
password character varying(100),
salt character varying(40)
);
Postgre MQTT 访问控制表
CREATE TABLE mqtt_acl (
id SERIAL primary key,
allow integer,
ipaddr character varying(60),
username character varying(100),
clientid character varying(100),
access integer,
topic character varying(100)
);
INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
VALUES
(1,1,NULL,'$all',NULL,2,'#'),
(2,0,NULL,'$all',NULL,1,'$SYS/#'),
(3,0,NULL,'$all',NULL,1,'eq #'),
(5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
(6,1,'127.0.0.1',NULL,NULL,2,'#'),
(7,1,NULL,'dashboard',NULL,1,'$SYS/#');
配置 Postgre 认证鉴权插件
etc/plugins/emq_auth_pgsql.conf:
## Postgre Server
auth.pgsql.server = 127.0.0.1:5432
auth.pgsql.pool = 8
auth.pgsql.username = root
#auth.pgsql.password =
auth.pgsql.database = mqtt
auth.pgsql.encoding = utf8
auth.pgsql.ssl = false
## Variables: %u = username, %c = clientid, %a = ipaddress
## Authentication Query: select password only
auth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1
## Password hash: plain, md5, sha, sha256, pbkdf2
auth.pgsql.password_hash = sha256
## sha256 with salt prefix
## auth.pgsql.password_hash = salt sha256
## sha256 with salt suffix
## auth.pgsql.password_hash = sha256 salt
## Superuser Query
auth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1
## ACL Query. Comment this query, the acl will be disabled.
auth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'
加载 Postgre 认证鉴权插件
./bin/emqttd_ctl plugins load emq_auth_pgsql
emq_auth_redis: Redis 认证/访问控制插件
基于 Redis 认证/访问控制: https://github.com/emqtt/emqttd_plugin_redis
配置 Redis 认证鉴权插件
etc/plugins/emq_auth_redis.conf:
## Redis Server
auth.redis.server = 127.0.0.1:6379
## Redis Pool Size
auth.redis.pool = 8
## Redis Database
auth.redis.database = 0
## Redis Password
## auth.redis.password =
## Variables: %u = username, %c = clientid
## Authentication Query Command
auth.redis.auth_cmd = HGET mqtt_user:%u password
## Password hash: plain, md5, sha, sha256, pbkdf2
auth.redis.password_hash = sha256
## Superuser Query Command
auth.redis.super_cmd = HGET mqtt_user:%u is_superuser
## ACL Query Command
auth.redis.acl_cmd = HGETALL mqtt_acl:%u
Redis 用户 Hash
默认基于用户 Hash 认证:
HSET mqtt_user:<username> is_superuser 1
HSET mqtt_user:<username> password "passwd"
Redis ACL 规则 Hash
默认采用 Hash 存储 ACL 规则:
HSET mqtt_acl:<username> topic1 1
HSET mqtt_acl:<username> topic2 2
HSET mqtt_acl:<username> topic3 3
注解
1: subscribe, 2: publish, 3: pubsub
Redis 订阅 Hash
插件还支持 Redis 中创建 MQTT 订阅。当 MQTT 客户端连接成功,会自动从 Redis 加载订阅:
HSET mqtt_sub:<username> topic1 0
HSET mqtt_sub:<username> topic2 1
HSET mqtt_sub:<username> topic3 2
警告
2.0-rc.2 版本已将订阅加载迁移至 EMQX 产品的emqx_backend_redis插件。
加载 Redis 认证鉴权插件
./bin/emqttd_ctl plugins load emq_auth_redis
emq_auth_mongo: MongoDB 认证/访问控制插件
基于 MongoDB 认证/访问控制: https://github.com/emqtt/emqttd_plugin_mongo
配置 MongoDB 认证鉴权插件
etc/plugins/emq_auth_mongo.conf:
## Mongo Server
auth.mongo.server = 127.0.0.1:27017
## Mongo Pool Size
auth.mongo.pool = 8
## Mongo User
## auth.mongo.user =
## Mongo Password
## auth.mongo.password =
## Mongo Database
auth.mongo.database = mqtt
## auth_query
auth.mongo.auth_query.collection = mqtt_user
auth.mongo.auth_query.password_field = password
auth.mongo.auth_query.password_hash = sha256
auth.mongo.auth_query.selector = username=%u
## super_query
auth.mongo.super_query.collection = mqtt_user
auth.mongo.super_query.super_field = is_superuser
auth.mongo.super_query.selector = username=%u
## acl_query
auth.mongo.acl_query.collection = mqtt_user
auth.mongo.acl_query.selector = username=%u
MongoDB 数据库
use mqtt
db.createCollection("mqtt_user")
db.createCollection("mqtt_acl")
db.mqtt_user.ensureIndex({"username":1})
注解
数据库、集合名称可自定义
MongoDB 用户集合(User Collection)
{
username: "user",
password: "password hash",
is_superuser: boolean (true, false),
created: "datetime"
}
示例:
db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
db.mqtt_user:insert({username: "root", is_superuser: true})
MongoDB ACL 集合(ACL Collection)
{
username: "username",
clientid: "clientid",
publish: ["topic1", "topic2", ...],
subscribe: ["subtop1", "subtop2", ...],
pubsub: ["topic/#", "topic1", ...]
}
示例:
db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})
加载 Mognodb 认证插件
./bin/emqttd_ctl plugins load emq_auth_mongo
emq_modules 扩展模块插件
2.1 版本将全部扩展模块项目(emq_mod_presence, emq_mod_subscription, emq_mod_rewrite)合并为一个 emq_modules 项目。
配置 Modules 插件
##--------------------------------------------------------------------
## Presence Module
##--------------------------------------------------------------------
## Enable Presence, Values: on | off
module.presence = on
module.presence.qos = 1
##--------------------------------------------------------------------
## Subscription Module
##--------------------------------------------------------------------
## Enable Subscription, Values: on | off
module.subscription = on
## Subscribe the Topics automatically when client connected
module.subscription.1.topic = $client/%c
## Qos of the subscription: 0 | 1 | 2
module.subscription.1.qos = 1
## module.subscription.2.topic = $user/%u
## module.subscription.2.qos = 1
##--------------------------------------------------------------------
## Rewrite Module
##--------------------------------------------------------------------
## Enable Rewrite, Values: on | off
module.rewrite = off
## {rewrite, Topic, Re, Dest}
## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
加载 Modules 插件
Modules 插件默认加载。
emq_mod_presence Presence 模块插件
2.0-rc.3 版本将 Presence 模块改为独立插件,Presence 模块会向 $SYS 主题(Topic)发布客户端上下线消息。
警告
2.1 版本该插件已并入 emq_modules 项目
配置 Presence 模块
etc/plugins/emq_mod_presence.conf:
## Enable presence module
## Values: on | off
module.presence = on
module.presence.qos = 0
加载 Presence 模块
Presence 模块默认加载。
emq_mod_subscription 自动订阅模块插件
2.0-rc.3 版本将 Subscription 模块改为独立插件,Subscription 扩展模块支持客户端上线时,自动订阅或恢复订阅某些主题(Topic)。
警告
2.1 版本该插件已并入 emq_modules 项目
配置 Subscription 模块
etc/plugins/emq_mod_subscription.conf:
警告
2.1 版本该插件已并入 emq_modules 项目
## Subscribe the Topics automatically when client connected
module.subscription.1.topic = $client/%c
## Qos of the subscription: 0 | 1 | 2
module.subscription.1.qos = 1
##module.subscription.2.topic = $user/%u
##module.subscription.2.qos = 1
## Load static subscriptions from backend storage
## Values: on | off
module.subscription.backend = on
加载 Subscription 模块
Subscription 模块默认加载。
emq_mod_rewrite 主题重写插件
2.0-rc.2 版本将 rewrite 模块改为独立插件,rewrite 插件支持重写发布订阅的主题(Topic)。
警告
2.1版本该插件已并入 emq_modules 项目
配置 Rewrite 插件
etc/plugins/emq_mod_rewrite.conf:
[
{emq_mod_rewrite, [
{rules, [
%% {rewrite, Topic, Re, Dest}
%% Example: x/y/ -> z/y/
%% {rewrite, "x/#", "^x/y/(.+)$", "z/y/$1"},
%% {rewrite, "y/+/z/#", "^y/(.+)/z/(.+)$", "y/z/$2"}
]}
]}
].
加载 Rewrite 插件
./bin/emqttd_ctl plugins load emq_mod_rewrite
emq_coap: CoAP 协议插件
CoAP 协议插件,支持 RFC 7252 规范。
配置 CoAP 协议插件
coap.server = 5683
coap.prefix.mqtt = mqtt
coap.handler.mqtt = emq_coap_gateway
加载 CoAP 协议插件
./bin/emqttd_ctl plugins load emq_coap
libcoap 客户端
yum install libcoap
% coap client publish message
coap-client -m post -e "qos=0&retain=0&message=payload&topic=hello" coap://localhost/mqtt
emq_sn: MQTT-SN 协议插件
MQTT-SN 协议插件,支持 MQTT-SN 网关模式。
配置 MQTT-SN 协议插件
注解
默认 MQTT-SN 协议 UDP 端口: 1884
etc/plugins/emq_sn.conf:
mqtt.sn.port = 1884
加载 MQTT-SN 协议插件
./bin/emqttd_ctl plugins load emq_sn
emq_stomp: Stomp 协议插件
Stomp 协议插件。支持 STOMP 1.0/1.1/1.2 协议客户端连接 EMQ,发布订阅 MQTT 消息。
配置插件
注解
Stomp 协议端口: 61613
etc/plugins/emq_stomp.conf:
stomp.default_user.login = guest
stomp.default_user.passcode = guest
stomp.allow_anonymous = true
stomp.frame.max_headers = 10
stomp.frame.max_header_length = 1024
stomp.frame.max_body_length = 8192
stomp.listener = 61613
stomp.listener.acceptors = 4
stomp.listener.max_clients = 512
加载 Stomp 插件
./bin/emqttd_ctl plugins load emq_stomp
emq_sockjs: Stomp/Sockjs 插件
警告
2.0 版本不再维护 SockJS 插件
配置 SockJS 插件
etc/plugins/emq_sockjs.config:
注解
缺省端口: 61616
[
{emq_sockjs, [
{sockjs, []},
{cowboy_listener, {stomp_sockjs, 61616, 4}},
%% TODO: unused...
{stomp, [
{frame, [
{max_headers, 10},
{max_header_length, 1024},
{max_body_length, 8192}
]}
]}
]}
].
加载 SockJS 插件
./bin/emqttd_ctl plugins load emq_sockjs
插件演示页面
emq_recon: Recon 性能调试插件
emq_recon 插件集成 recon 性能调测库,’./bin/emqttd_ctl’ 命令行注册 recon 命令。
配置 Recon 插件
etc/plugins/emq_recon.conf:
%% Garbage Collection: 10 minutes
recon.gc_interval = 600
加载 Recon 插件
./bin/emqttd_ctl plugins load emq_recon
recon 插件命令
./bin/emqttd_ctl recon
recon memory #recon_alloc:memory/2
recon allocated #recon_alloc:memory(allocated_types, current|max)
recon bin_leak #recon:bin_leak(100)
recon node_stats #recon:node_stats(10, 1000)
recon remote_load Mod #recon:remote_load(Mod)
emq_reloader: 代码热加载插件
用于开发调试的代码热升级插件。加载该插件后,EMQ 会自动热升级更新代码。
注解
产品部署环境不建议使用该插件
配置 Reloader 插件
etc/plugins/emq_reloader.conf:
reloader.interval = 60
reloader.logfile = log/reloader.log
加载 Reloader 插件
./bin/emqttd_ctl plugins load emq_reloader
Reloader 插件命令
./bin/emqttd_ctl reload
reload <Module> # Reload a Module
EMQ 2.0 插件开发
创建插件项目
参考 emq_plugin_template 插件模版创建新的插件项目。
注册认证/访问控制模块
认证演示模块 - emq_auth_demo.erl
-module(emq_auth_demo).
-behaviour(emqttd_auth_mod).
-include_lib("emqttd/include/emqttd.hrl").
-export([init/1, check/3, description/0]).
init(Opts) -> {ok, Opts}.
check(#mqtt_client{client_id = ClientId, username = Username}, Password, _Opts) ->
io:format("Auth Demo: clientId=~p, username=~p, password=~p~n",
[ClientId, Username, Password]),
ok.
description() -> "Demo Auth Module".
访问控制演示模块 - emqttd_acl_demo.erl
-module(emq_acl_demo).
-include_lib("emqttd/include/emqttd.hrl").
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
init(Opts) ->
{ok, Opts}.
check_acl({Client, PubSub, Topic}, Opts) ->
io:format("ACL Demo: ~p ~p ~p~n", [Client, PubSub, Topic]),
allow.
reload_acl(_Opts) ->
ok.
description() -> "ACL Module Demo".
注册认证、访问控制模块 - emq_plugin_template_app.erl
ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
注册扩展钩子(Hooks)
通过钩子(Hook)处理客户端上下线、主题订阅、消息收发。
emq_plugin_template.erl:
%% Called when the plugin application start
load(Env) ->
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribe/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
扩展钩子(Hook):
钩子 | 说明 |
---|---|
client.connected | 客户端上线 |
client.subscribe | 客户端订阅主题前 |
session.subscribed | 客户端订阅主题后 |
client.unsubscribe | 客户端取消订阅主题 |
session.unsubscribed | 客户端取消订阅主题后 |
message.publish | MQTT 消息发布 |
message.delivered | MQTT 消息送达 |
message.acked | MQTT 消息回执 |
client.disconnected | 客户端连接断开 |
注册扩展命令行
扩展命令行演示模块 - emq_cli_demo.erl
-module(emq_cli_demo).
-include_lib("emqttd/include/emqttd_cli.hrl").
-export([cmd/1]).
cmd(["arg1", "arg2"]) ->
?PRINT_MSG("ok");
cmd(_) ->
?USAGE([{"cmd arg1 arg2", "cmd demo"}]).
注册命令行模块 - emq_plugin_template_app.erl
emqttd_ctl:register_cmd(cmd, {emq_cli_demo, cmd}, []).
插件加载后,’./bin/emqttd_ctl’新增命令行:
./bin/emqttd_ctl cmd arg1 arg2
插件配置文件
插件自带配置文件放置在 etc/${plugin_name}.conf|config,EMQ 支持两种插件配置格式:
- ${plugin_name}.config,Erlang 原生配置文件格式:
[
{plugin_name, [
{key, value}
]}
].
- ${plugin_name}.conf, sysctl 的 k = v 通用格式:
plugin_name.key = value
注解
k = v 格式配置需要插件开发者创建 priv/plugin_name.schema 映射文件。
编译发布插件
- clone emq-relx 项目:
git clone https://github.com/emqtt/emq-relx.git
- Makefile 增加 DEPS:
DEPS += plugin_name
dep_plugin_name = git url_of_plugin
- relx.config 中 release 段落添加:
{plugin_name, load},