进程间通信

在Erlang中进行进程间通信的唯一方法就是消息传递。一个消息通过原语!send)发送给另一个进程:

  1. Pid ! Message

Pid是要向其发送消息的进程的标识符。任何合法的Erlang表达式都可以作为一个消息发送。send是一个会对其参数进行求值的原语。它的返回值是发送的消息。因此:

  1. foo(12) ! bar(baz)

会分别对foo(12)bar(baz)进行求值得到进程标识符和要发送的消息。如同其他的Erlang函数一样,send对其参数的求值顺序是不确定的。它会将消息参数求值的结果作为返回值返回。发送消息是一个异步操作,因此send既不会等待消息送达目的地也不会等待对方收到消息。就算发送消息的目标进程已经退出了,系统也不会通知发送者。这是为了保持消息传递的异步性──应用程序必须自己来实现各种形式的检查(见下文)。消息一定会被传递到接受者那里,并且保证是按照其发送的顺序进行传递的。

原语receive被用于接收消息。它的语法如下:

  1. receive
  2. Message1 [when Guard1] ->
  3. Actions1 ;
  4. Message2 [when Guard2] ->
  5. Actions2 ;
  6. ...
  7. end

每个进程都有一个邮箱,所有发送到该进程的消息都被按照它们到达的顺序依次存储在邮箱里。在上面的例子中,Message1Message2是用于匹配进程邮箱中的消息的模式。当找到一个匹配的消息并且对应的保护式(Guard)满足的时候,这个消息就被选中,并从邮箱中删除,同时对应的ActionsN会被执行。receive会返回ActionosN中最后一个表达式求值的结果。就如同Erlang里其他形式的模式匹配一样,消息模式中未绑定(unbound)量会被绑定(bound)。未被receive选中的消息会按照原来的顺序继续留在邮箱中,用于下一次recieve的匹配。调用receive的进程会一直阻塞,直到有匹配的消息为止。

Erlang有一种选择性接收消息的机制,因此意外发送到一个进程的消息不会阻塞其它正常的消息。不过,由于所有未匹配的消息会被留在邮箱中,保证系统不要完全被这样的无关消息填满就变成了程序员的责任。

消息接收的顺序

receive尝试寻找一个匹配的消息的时候,它会依次对邮箱中的每一个消息尝试用给定的每个模式去进行匹配。我们用下面的例子来解释其工作原理。

图5.2(a)给出了一个进程的邮箱,邮箱里面有四个消息,依次是msg_1msg_2msg_3msg_4。运行

  1. receive
  2. msg_3 ->
  3. ...
  4. end

_images/5.2.png图5.2

会匹配到邮箱中的msg_3并导致它被从邮箱中删除。然后邮箱的状态会变成如图5.2(b)所示。当我们再运行

  1. receive
  2. msg_4 ->
  3. ...
  4. msg_2 ->
  5. ...
  6. end

的时候,receive会依次对邮箱中的每一个消息,首先尝试与msg_4匹配,然后尝试与msg_2匹配。结果是msg_2匹配成功并被从邮箱中删除,邮箱的状态变成图5.2(c)那样。最后,运行

  1. receive
  2. AnyMessage ->
  3. ...
  4. end

其中AnyMessage是一个未绑定(unbound)的变量,receive会匹配到邮箱里的msg_1并将其删除,邮箱中最终只剩下msg_4,如图5.2(d)所示。

这说明receive里的模式的顺序并不能直接用来实现消息的优先级,不过这可以通过超时的机制来实现,详见第??小节。

只接收来自某个特定进程的消息

有时候我们会只希望接收来自某一个特定进程的消息。要实现这个机制,消息发送者必须显式地在消息中包含自己的进程标识符:

  1. Pid | {self(),abc}

BIF self()返回当前进程的标识符。这样的消息可以通过如下方式来接收:

  1. receive
  2. {Pid,Msg} ->
  3. ...
  4. end

如果Pid已经预先绑定(bound)到发送者的进程标识符上了,那么如上所示的receive就能实现只接收来自该进程[2]的消息了。

一些例子

程序5.1中的模块实现了一个简单的计数器,可以用来创建一个包含计数器的进程并对计数器进行递增操作。

程序 5.1

  1. -module(counter).
  2. -export([start/0,loop/1]).
  3.  
  4. start() ->
  5. spawn(counter, loop, [0]).
  6.  
  7. loop(Val) ->
  8. receive
  9. increment ->
  10. loop(Val + 1)
  11. end.

这个例子展示了一些基本概念:

  • 每个新的计数器进程都通过调用counter:start/0来创建。每个进程都会以调用counter:loop(0)启动。
  • 用于实现一个永久的进程的递归函数调用在等待输入的时候会被挂起。loop是一个尾递归函数,这让计数器进程所占用的空间保持为一个常数。
  • 选择性的消息接收,在这个例子中,仅接收increment消息。不过,在这过例子中也有不少缺陷,比如:

  • 由于计数器的值是一个进程的局部变量,只能被自己访问到,却其他进程没法获取这个值。

  • 消息协议是显式的,其他进程需要显式地发送increment消息给计数器进程。

程序5.2

  1. -module(counter).
  2. -export([start/0,loop/1,increment/1,value/1,stop/1]).
  3.  
  4. %% First the interface functions.
  5. start() ->
  6. spawn(counter, loop, [0]).
  7.  
  8. increment(Counter) ->
  9. Counter ! increment.
  10.  
  11. value(Counter) ->
  12. Counter ! {self(),value}
  13. receive
  14. {Counter,Value} ->
  15. Value
  16. end.
  17.  
  18. stop(Counter) ->
  19. Counter ! stop.
  20.  
  21. %% The counter loop.
  22. loop(Val) ->
  23. receive
  24. increment ->
  25. loop(Val + 1);
  26. {From,value} ->
  27. From ! {self(),Val},
  28. loop(Val);
  29. stop -> % No recursive call here
  30. true;
  31. Other -> % All other messages
  32. loop(Val)
  33. end.

下一个例子展示了如何修正这些缺陷。程序5.2是counter模块的改进版,允许对计数器进行递增、访问计数器的值以及停止计数器。

同前一个例子中一样,在这里一个新的计数器进程通过调用counter::start()启动起来,返回值是这个计数器的进程标识符。为了隐藏消息传递的协议,我们提供了接口函数incrementvaluestop来操纵计数器。

计数器进程使用选择性接收的机制来处理发送过来的请求。它同时展示了一种处理未知消息的方法。通过在receive的最后一个子句中使用未绑定(unbound)的变量Other作为模式,任何未被之前的模式匹配到的消息都会被匹配到,此时我们直接忽略这样的未知消息并继续等待下一条消息。这是处理未知消息的标准方法:通过receive把它们从邮箱中删除掉。

为了访问计数器的值,我们必须将自己的Pid作为消息的一部分发送给计数器进程,这样它才能将回复发送回来。回复的消息中也包含了发送方的进程标识符(在这里也就是计数器进程的Pid),这使得接收进程可以只接收包含回复的这个消息。简单地等待一个包含未知值(在这个例子中是一个数字)的消息是不安全的做法,任何不相关的碰巧发送到该进程的消息都会被匹配到。因此,在进程之间发送的消息通常都会包含某种标识自己的机制,一种方法是通过内容进行标识,就像发送给计数器进程的请求消息一样,另一种方法是通过在消息中包含某种“唯一”并且可以很容易识别的标识符,就如同计数器进程发回的包含计数器值的回复消息一样。

_images/5.3.png图5.3

现在我们再来考虑对一个有穷自动机(FSM)进行建模。图5.3展示了一个4状态的简单FSM以及可能的状态转移和相应的触发事件。一种编写这样的“状态-事件”机器的方法如程序5.3所示。在这段代码中,我们只专注于如何表示状态以及管理状态之间的转移。每个状态由一个单独的函数表示,而事件则表示为消息。

程序 5.2

  1. s1() ->
  2. receive
  3. msg_a ->
  4. s2();
  5. msg_c ->
  6. s3()
  7. end.
  8.  
  9. s2() ->
  10. receive
  11. msg_x ->
  12. s3();
  13. msg_h ->
  14. s4()
  15. end.
  16.  
  17. s3() ->
  18. receive
  19. msg_b ->
  20. s1();
  21. msg_y ->
  22. s2()
  23. end.
  24.  
  25. s4() ->
  26. receive
  27. msg_i ->
  28. s3()
  29. end.

转台函数通过receive来等待事件所对应的消息。当收到消息时,FSM通过调用相应的状态函数转移到指定的状态。通过保证每次对于新状态的函数的调用都是最后一个语句(参见第??小节),FSM进程可以在一个常数大小的空间中进行求值。

状态数据可以通过为状态函数添加参数的方式来处理。需要在进入状态的时候执行的动作在调用receive之前完成,而需要在离开状态时执行的动作可以放在对应的receive子句中调用新的状态函数之前。