4-GenEvent

事件管理器
注册表进程的事件
事件流

注:Elixir v1.1 发布后本章内容被从官方入门手册中拿掉了。
这里留存,如果仍需使用GenEvent,可以查阅。大家可以暂时跳过这一章。

本章探索GenEvent,Elixir和OTP提供的又一个行为抽象。它允许我们派生一个事件管理器,用来向多个处理者发布事件消息。

我们会激发两种事件:一个是每次bucket被加到注册表,另一个是从注册表中移除。

4.1-事件管理器

打开一个新iex -S mix对话,玩弄一下GenEvent的API:

  1. iex> {:ok, manager} = GenEvent.start_link
  2. {:ok, #PID<0.83.0>}
  3. iex> GenEvent.sync_notify(manager, :hello)
  4. :ok
  5. iex> GenEvent.notify(manager, :world)
  6. :ok

函数GenEvent.start_link/0启动了一个新的事件管理器。不需额外的参数。
管理器创建好后,我们就可以调用GenEvent.notify/2函数和GenEvent.sync_notify/2函数来发送通知。

但是,当前还没有任何消息处理者绑定到该管理器,因此不管它发啥通知,叫破喉咙都不会有事儿发生。

现在就在iex对话里创建第一个事件处理器:

  1. iex> defmodule Forwarder do
  2. ...> use GenEvent
  3. ...> def handle_event(event, parent) do
  4. ...> send parent, event
  5. ...> {:ok, parent}
  6. ...> end
  7. ...> end
  8. iex> GenEvent.add_handler(manager, Forwarder, self())
  9. :ok
  10. iex> GenEvent.sync_notify(manager, {:hello, :world})
  11. :ok
  12. iex> flush
  13. {:hello, :world}
  14. :ok

我们创建了一个处理器(handler),并通过函数GenEvent.add_handler/3把它“绑定”到事件管理器上,传递的三个参数是:

  1. 刚启动的那个时间管理器
  2. 定义事件处理者的模块(如这里的Forwarder
  3. 事件处理者的状态:在这里,使用当前进程的id

加上这个处理器之后,可以看到,调用了sync_notify/2之后,Forwarder处理器成功地把事件转给了它的父进程(IEx),因此那个消息进入了我们的收件箱。

这里有几点需要注意:

  1. 事件处理器运行在事件管理器的同一个进程里
  2. sync_notify/2同步地运行事件处理器处理请求
  3. notify/2使事件处理器异步处理请求

这里sync_notify/2notify/2类似于GenServer里面的call/2cast/2。推荐使用sync_notify/2
它以反向压力的机制工作,减少了“发消息速度快过消息被成功分发的速度”的可能性。

记得去GenEvent的模块文档阅读其它函数。
目前我们的程序就用提到的这些知识就可以了。

4.2-注册表进程的事件

为了能发出事件消息,我们要稍微修改一下我们的注册表进程,使之与一个事件管理器进行协作。
我们需要在注册表进程启动的时候,事件管理器也能自动启动。
比如在init/1回调里面,最好能传递事件处理器的pid或名字什么的作为参数来start_link,以此将启动事件管理器与注册表进程分解开。

但是,首先让我们修改测试中注册表进程的行为。打开test/kv/registry_text.exs,修改目前的setup回调,然后再加上新的测试:

  1. defmodule Forwarder do
  2. use GenEvent
  3. def handle_event(event, parent) do
  4. send parent, event
  5. {:ok, parent}
  6. end
  7. end
  8. setup do
  9. {:ok, manager} = GenEvent.start_link
  10. {:ok, registry} = KV.Registry.start_link(manager)
  11. GenEvent.add_mon_handler(manager, Forwarder, self())
  12. {:ok, registry: registry}
  13. end
  14. test "sends events on create and crash", %{registry: registry} do
  15. KV.Registry.create(registry, "shopping")
  16. {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  17. assert_receive {:create, "shopping", ^bucket}
  18. Agent.stop(bucket)
  19. assert_receive {:exit, "shopping", ^bucket}
  20. end

为了测试我们即将添加的功能,我们首先定义了一个Forwarder事件处理器,类似刚才在IEx中创建的那样。
Setup中,我们启动了事件管理器,把它作为参数传递给了注册表进程,并且向该管理器添加了我们定义的Forwarder处理器。
至此,事件可以发向待测进程了。

在测试中,我们创建、停止了一个bucket进程,并且使用assert_receive断言来检查是否收到了:create:exit事件消息。
断言assert_receive默认是500毫秒超时时间,这对于测试足够了。
同样要指出的是,assert_receive期待接收一个模式,而不是一个值。
这就是为啥我们用^bucket来匹配bucket的pid(参考《入门》关于变量的匹配内容)。

最终,注意我们调用了GenEvent.add_mon_handler/3来代替GenEvent.add_handler/3。该函数不但可以添加一个处理器,它还告诉事件管理器来监视当前进程。如果当前进程挂了,事件处理器也一并抹去。
这个很有道理,因为对于这里的Forwarder,如果消息的接收方(self()/测试进程)终止,我们理所应当停止转发消息。

好了,现在来修改注册表进程代码来让测试pass。打开lib/kv/registry.ex,输入以下新的内容(一些关键语句的解释写在注释里):

  1. defmodule KV.Registry do
  2. use GenServer
  3. ## Client API
  4. @doc """
  5. Starts the registry.
  6. """
  7. def start_link(event_manager, opts \\ []) do
  8. # 1. start_link now expects the event manager as argument
  9. GenServer.start_link(__MODULE__, event_manager, opts)
  10. end
  11. @doc """
  12. Looks up the bucket pid for `name` stored in `server`.
  13. Returns `{:ok, pid}` in case a bucket exists, `:error` otherwise.
  14. """
  15. def lookup(server, name) do
  16. GenServer.call(server, {:lookup, name})
  17. end
  18. @doc """
  19. Ensures there is a bucket associated with the given `name` in `server`.
  20. """
  21. def create(server, name) do
  22. GenServer.cast(server, {:create, name})
  23. end
  24. ## Server callbacks
  25. def init(events) do
  26. # 2. The init callback now receives the event manager.
  27. # We have also changed the manager state from a tuple
  28. # to a map, allowing us to add new fields in the future
  29. # without needing to rewrite all callbacks.
  30. names = HashDict.new
  31. refs = HashDict.new
  32. {:ok, %{names: names, refs: refs, events: events}}
  33. end
  34. def handle_call({:lookup, name}, _from, state) do
  35. {:reply, HashDict.fetch(state.names, name), state}
  36. end
  37. def handle_cast({:create, name}, state) do
  38. if HashDict.get(state.names, name) do
  39. {:noreply, state}
  40. else
  41. {:ok, pid} = KV.Bucket.start_link()
  42. ref = Process.monitor(pid)
  43. refs = HashDict.put(state.refs, ref, name)
  44. names = HashDict.put(state.names, name, pid)
  45. # 3. Push a notification to the event manager on create
  46. GenEvent.sync_notify(state.events, {:create, name, pid})
  47. {:noreply, %{state | names: names, refs: refs}}
  48. end
  49. end
  50. def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
  51. {name, refs} = HashDict.pop(state.refs, ref)
  52. names = HashDict.delete(state.names, name)
  53. # 4. Push a notification to the event manager on exit
  54. GenEvent.sync_notify(state.events, {:exit, name, pid})
  55. {:noreply, %{state | names: names, refs: refs}}
  56. end
  57. def handle_info(_msg, state) do
  58. {:noreply, state}
  59. end
  60. end

这些改变很直观。我们给GenServer初始化过程传递一个事件管理器,该管理器是我们用start_link启动进程时作为参数收到的。
我们还改了cast和info两个回调,在里面调用了GenEvent.sync_notify/2
最后,我们借这个机会还把服务器的状态改成了一个图,方便我们以后改进注册表进程。

执行测试,都是绿的。

4.3-事件流

最后一个值得探索的GenEvent的功能点是像处理流一样处理事件:

  1. iex> {:ok, manager} = GenEvent.start_link
  2. {:ok, #PID<0.83.0>}
  3. iex> spawn_link fn ->
  4. ...> for x <- GenEvent.stream(manager), do: IO.inspect(x)
  5. ...> end
  6. :ok
  7. iex> GenEvent.notify(manager, {:hello, :world})
  8. {:hello, :world}
  9. :ok

上面的例子中,我们创建了一个GenEvent.stream(manager),返回一个事件的流(即一个enumerable),并随即处理了它。
处理事件是一个阻塞的行为,我们派生新进程来处理事件消息,把消息打印在终端上。这一系列的操作,就像看到的那样,如实地执行了。
每次调用sync_notify/2或者notify/2,事件都被打印在终端上,后面跟着一个:ok(IEx输出语句的执行结果)。

通常事件流提供了足够多的内置功能来处理事件,使我们不必实现我们自己的处理器。
但是,若是需要某些自定义的功能,或是在测试时,定义自己的事件处理器回调才是正道。

至此,我们有了一个事件处理器,一个注册表进程以及可能会同时执行的许多bucket进程,是时候开始担心这些进程会不会挂掉了。