有类型Actor

有类型Actor是Active Objects 模式的一种实现。Smalltalk诞生之时,就已经缺省地将方法调用从同步操作换为异步派发。

有类型Actor由两 “部分” 组成, 一个公开的接口和一个实现, 如果你有“企业级”Java的开发经验, 则应该非常熟悉。 对普通actor来说,你拥有一个外部API(公开接口的实例)来将方法调用异步地委托给其实现的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约,你不需要定义你自己的消息;它的劣势在于对你能做什么和不能做什么进行了一些限制,即你不能使用become/unbecome

有类型Actor是使用JDK Proxies实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意

和普通Akka actor一样,有类型actor一次也只处理一个消息。

何时使用有类型actor

有类型actor是桥接actor系统(”内部”)和非actor代码 (”外部”)的良好方式,因为它们允许你在外部编写普通OO式代码。把它们看做大门:其实用性在于私有领域和公共接口之间,而你不想你的房子内部有太多的门,不是吗?更长的讨论请参见这篇博客

更多的背景:TypedActors可以很容易被滥用作RPC,它们都是一个抽象概念,众所周知是有缺陷的。因此当我们容易和正确的编写高度可扩展的并行软件时,TypedActors并非首选。他们有自己的定位,必要时才使用它们。

工具箱

在创建第一个有类型Actor之前,我们先了解一下我们手上可供使用的工具,它位于akka.actor.TypedActor中。

  1. import akka.actor.TypedActor
  2. //返回有类型actor扩展
  3. val extension = TypedActor(system) //system是一个Actor系统实例
  4. //判断一个引用是否是有类型actor代理
  5. TypedActor(system).isTypedActor(someReference)
  6. //返回一个外部有类型actor代理所代表的Akka actor
  7. TypedActor(system).getActorRefFor(someReference)
  8. //返回当前的ActorContext,
  9. // 此方法仅在一个TypedActor 实现的方法中有效
  10. val c: ActorContext = TypedActor.context
  11. //返回当前有类型actor的外部代理,
  12. // 此方法仅在一个TypedActor 实现的方法中有效
  13. val s: Squarer = TypedActor.self[Squarer]
  14. //返回一个有类型Actor扩展的上下文实例
  15. //这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor
  16. TypedActor(TypedActor.context)

警告

就象不应该暴露Akka actor的this一样,不要暴露有类型Actor的this,你应该传递其外部代理引用,它可以在你的有类型Actor中用TypedActor.self获得, 这是你的外部标识, 就象ActorRef是Akka actor的外部标识一样。

创建有类型Actor

要创建有类型Actor,需要一个或多个接口,和一个实现。

我们的示例接口:

  1. trait Squarer {
  2. def squareDontCare(i: Int): Unit //fire-forget
  3. def square(i: Int): Future[Int] //non-blocking send-request-reply
  4. def squareNowPlease(i: Int): Option[Int] //blocking send-request-reply
  5. def squareNow(i: Int): Int //blocking send-request-reply
  6. @throws(classOf[Exception]) //declare it or you will get an UndeclaredThrowableException
  7. def squareTry(i: Int): Int //blocking send-request-reply with possible exception
  8. }

好,现在我们有了一些可以调用的方法,但我们需要在SquarerImpl中实现。

  1. class SquarerImpl(val name: String) extends Squarer {
  2. def this() = this("default")
  3. def squareDontCare(i: Int): Unit = i * i //Nobody cares :(
  4. def square(i: Int): Future[Int] = Future.successful(i * i)
  5. def squareNowPlease(i: Int): Option[Int] = Some(i * i)
  6. def squareNow(i: Int): Int = i * i
  7. def squareTry(i: Int): Int = throw new Exception("Catch me!")
  8. }

太好了,我们现在有了接口,也有了对这个接口的实现,我们还知道如何从他们来创建一个有类型actor,现在我们来看看如何调用这些方法。

创建我们的Squarer的有类型actor实例的最简单方法是:

  1. val mySquarer: Squarer =
  2. TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())

第一个类型是代理的类型,第二个类型是实现的类型。如果要调用某特定的构造方法要这样做:

  1. val otherSquarer: Squarer =
  2. TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
  3. new SquarerImpl("foo")), "name")

由于你提供了一个 Props, 你可以指定使用哪个派发器, 缺省的超时时间等。

方法派发语义

方法返回:

  • Unit 会以 fire-and-forget语义进行派发,与ActorRef.tell完全一致。
  • akka.dispatch.Future[_] 会以 send-request-reply语义进行派发,与 ActorRef.ask完全一致。
  • scala.Option[_]会以send-request-reply语义派发,但是阻塞等待应答, 如果在超时时限内没有应答则返回scala.None,否则返回包含结果的scala.Some[_]。在这个调用中发生的异常将被重新抛出。
  • 任何其它类型的值将以send-request-reply语义进行派发,但阻塞地等待应答, 如果超时会抛出java.util.concurrent.TimeoutException,如果发生异常则将异常重新抛出。

消息与不可变性

虽然Akka不能强制要求你传给有类型Actor方法的参数类型是不可变的, 我们强烈建议只传递不可变参数。

单向消息发送
  1. mySquarer.squareDontCare(10)

就是这么简单!方法会在另一个线程中异步地调用。

请求-响应消息发送
  1. val oSquare = mySquarer.squareNowPlease(10) //Option[Int]

如果需要,这会阻塞到有类型actor的Props中设置的超时时限。如果超时,会返回None

  1. val iSquare = mySquarer.squareNow(10) //Int

如果需要,这会阻塞到有类型actor的Props中设置的超时时限。如果超时,会抛出java.util.concurrent.TimeoutException

请求-以future作为响应的消息发送
  1. val fSquare = mySquarer.square(10) //A Future[Int]

这个调用是异步的,返回的Future可以用作异步组合。

终止有类型Actor

由于有类型actor底层还是Akka actor,所以在不需要的时候要终止它。

  1. TypedActor(system).stop(mySquarer)

这将会尽快地异步终止与指定的代理关联的有类型Actor。

  1. TypedActor(system).poisonPill(otherSquarer)

这将会在有类型actor完成所有入队的调用后异步地终止它。

有类型Actor监管树

你可以通过传入一个ActorContext来获得有类型Actor上下文,所以你可以对它调用typedActorOf(..)来创建有类型子actor。

  1. //Inside your Typed Actor
  2. val childSquarer: Squarer =
  3. TypedActor(TypedActor.context).typedActorOf(TypedProps[SquarerImpl]())
  4. //Use "childSquarer" as a Squarer

通过将ActorContext作为参数传给TypedActor.get(…),也可以为普通的Akka actor创建有类型子actor。

监管策略

通过让你的有类型Actor的具体实现类实现TypedActor.Supervisor方法,你可以定义用来监管子actor的策略,就像监管与监控容错(Scala)所描述的。

生命周期回调

通过使你的有类型actor实现类实现以下方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart

你可以hook进有类型actor的整个生命周期。

接收任意消息

如果你的有类型actor的实现类扩展了akka.actor.TypedActor.Receiver,所有非方法调用MethodCall的消息会被传给onReceive方法.

这使你能够对DeathWatch的Terminated消息及其它类型的消息进行处理,例如,与无类型actor进行交互的场合。

代理

你可以使用带TypedProps和ActorRef参数的typedActorOf来将指定的Actor引用代理成一个有类型Actor。这在你需要与远程主机上的有类型Actor通信时会有用, 只要将ActorRef传递给 typedActorOf即可。

注意

目标Actor引用需要能处理MethodCall消息.

查找与远程处理

因为TypedActor底层还是Akka Actors,你可以使用typedActorOf来代理可能在远程节点上的ActorRefs

  1. val typedActor: Foo with Bar =
  2. TypedActor(system).
  3. typedActorOf(
  4. TypedProps[FooBar],
  5. actorRefToRemoteActor)
  6. //Use "typedActor" as a FooBar

功能扩充

以下是使用traits来为你的有类型actor混入行为的示例:

  1. trait Foo {
  2. def doFoo(times: Int): Unit = println("doFoo(" + times + ")")
  3. }
  4. trait Bar {
  5. def doBar(str: String): Future[String] =
  6. Future.successful(str.toUpperCase)
  7. }
  8. class FooBar extends Foo with Bar
  1. val awesomeFooBar: Foo with Bar =
  2. TypedActor(system).typedActorOf(TypedProps[FooBar]())
  3. awesomeFooBar.doFoo(10)
  4. val f = awesomeFooBar.doBar("yes")
  5. TypedActor(system).poisonPill(awesomeFooBar)

有类型路由器模式

有时你想要传播多个actor之间的消息。在Akka中实现这一目标的最简单方法是使用一个路由器,可以实现特定的路由逻辑,例如最小邮箱smallest-mailbox或一致性哈希consistent-hashing等。

路由器不能直接提供给有类型actor,但可以很容易的利用非类型化的路由器,并在其使用一个有类型代理即可。为了展示,让我们创建有类型actor并分配它们一些随机id,所以我们知道事实上,路由器已向消息发送给不同的actor:

  1. trait HasName {
  2. def name(): String
  3. }
  4. class Named extends HasName {
  5. import scala.util.Random
  6. private val id = Random.nextInt(1024)
  7. def name(): String = "name-" + id
  8. }

为了在此类actor的几个实例中轮询访问(round robin),你可以简单地创建一个普通的非类型化路由器,然后像下面的示例所示把它包装为一个TypedActor。这之所以能够正确工作,是因为有类型actor与普通actor使用相同的机制通讯,其方法调用最终都被转换为MethodCall消息的发送。

  1. def namedActor(): HasName = TypedActor(system).typedActorOf(TypedProps[Named]())
  2. // prepare routees
  3. val routees: List[HasName] = List.fill(5) { namedActor() }
  4. val routeePaths = routees map { r =>
  5. TypedActor(system).getActorRefFor(r).path.toStringWithoutAddress
  6. }
  7. // prepare untyped router
  8. val router: ActorRef = system.actorOf(RoundRobinGroup(routeePaths).props())
  9. // prepare typed proxy, forwarding MethodCall messages to `router`
  10. val typedRouter: HasName =
  11. TypedActor(system).typedActorOf(TypedProps[Named](), actorRef = router)
  12. println("actor was: " + typedRouter.name()) // name-184
  13. println("actor was: " + typedRouter.name()) // name-753
  14. println("actor was: " + typedRouter.name()) // name-320
  15. println("actor was: " + typedRouter.name()) // name-164