钩子(Hook)设计
钩子(Hook)定义
EMQ X 消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):
钩子 | 说明 |
---|---|
client.authenticate | 客户端认证 |
client.check_acl | 客户端 ACL 检查 |
client.connected | 客户端上线 |
client.subscribe | 客户端订阅主题前 |
client.unsubscribe | 客户端取消订阅主题 |
session.subscribed | 客户端订阅主题后 |
session.unsubscribed | 客户端取消订阅主题后 |
message.publish | MQTT 消息发布 |
message.deliver | MQTT 消息投递前 |
message.acked | MQTT 消息回执 |
client.disconnected | 客户端连接断开 |
钩子(Hook) 采用职责链设计模式( Chain-of-responsibility_pattern (opens new window) ),扩展模块或插件向钩子注册回调函数,系统在客户端上下线、主题订阅或消息发布确认时,触发钩子顺序执行回调函数:
不同钩子的回调函数输入参数不同,用户可参考插件模版的 emqx_plugin_template (opens new window) 模块,每个回调函数应该返回:
返回 | 说明 |
---|---|
ok | 继续执行 |
{ok, NewAcc} | 返回累积参数继续执行 |
stop | 停止执行 |
{stop, NewAcc} | 返回累积参数停止执行 |
钩子(Hook)实现
emqx 模块封装了 Hook 接口:
-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
hook(HookPoint, Action) ->
emqx_hooks:add(HookPoint, Action).
-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer())
-> ok | {error, already_exists}).
hook(HookPoint, Action, Priority) when is_integer(Priority) ->
emqx_hooks:add(HookPoint, Action, Priority);
hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
emqx_hooks:add(HookPoint, Action, Filter);
hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
emqx_hooks:add(HookPoint, Action, InitArgs).
-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
-> ok | {error, already_exists}).
hook(HookPoint, Action, Filter, Priority) ->
emqx_hooks:add(HookPoint, Action, Filter, Priority).
-spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok).
unhook(HookPoint, Action) ->
emqx_hooks:del(HookPoint, Action).
-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
run_hook(HookPoint, Args) ->
emqx_hooks:run(HookPoint, Args).
-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
run_fold_hook(HookPoint, Args, Acc) ->
emqx_hooks:run_fold(HookPoint, Args, Acc).
钩子(Hook)使用
emqx_plugin_template (opens new window) 提供了全部钩子的使用示例,例如端到端的消息处理回调:
-module(emqx_plugin_template).
-export([load/1, unload/0]).
-export([on_message_publish/2, on_message_deliver/3, on_message_acked/3]).
load(Env) ->
emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqx:hook('message.deliver', fun ?MODULE:on_message_deliver/3, [Env]),
emqx:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqx_message:format(Message)]),
{ok, Message}.
on_message_deliver(Credentials, Message, _Env) ->
io:format("deliver to client ~s: ~s~n", [Credentials, emqx_message:format(Message)]),
{ok, Message}.
on_message_acked(Credentials, Message, _Env) ->
io:format("client ~s acked: ~s~n", [Credentials, emqx_message:format(Message)]),
{ok, Message}.
unload() ->
emqx:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqx:unhook('message.acked', fun ?MODULE:on_message_acked/3),
emqx:unhook('message.deliver', fun ?MODULE:on_message_deliver/3).