路由

消息可以通过路由器发送,以便有效地将它们路由到目的actor,称为其routee。一个Router可以在actor内部或外部使用,并且你可以自己管理routee或使用有配置功能的自我包含的路由actor。

根据你应用程序的需求,可以使用不同的路由策略。Akka附带了几个有用的路由策略,开箱即用。但是正如将在这一章中看到地,你也可以创建自己的路由

一个简单的路由器

下面的示例阐释如何使用Router和在actor内管理routee。

  1. import akka.routing.ActorRefRoutee
  2. import akka.routing.Router
  3. import akka.routing.RoundRobinRoutingLogic
  4. class Master extends Actor {
  5. var router = {
  6. val routees = Vector.fill(5) {
  7. val r = context.actorOf(Props[Worker])
  8. context watch r
  9. ActorRefRoutee(r)
  10. }
  11. Router(RoundRobinRoutingLogic(), routees)
  12. }
  13. def receive = {
  14. case w: Work =>
  15. router.route(w, sender())
  16. case Terminated(a) =>
  17. router = router.removeRoutee(a)
  18. val r = context.actorOf(Props[Worker])
  19. context watch r
  20. router = router.addRoutee(r)
  21. }
  22. }

我们创建一个Router,并指定当路由消息到routee时,它应该使用RoundRobinRoutingLogic

Akka自带的路由逻辑如下:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

我们像在ActorRefRoutee包装下创建普通子actor一样创建routee。我们监控routee从而能够在它们被终止的情况下取代他们。

通过路由器发送消息是用route方法完成的,像上面例子中的Work消息一样。

Router是不可变的,而RoutingLogic是线程安全的;意味着他们也可以在actor外部使用。

注意

一般情况下,任何发送到路由器的消息将被向前发送到它的routee,但有一个例外。特别地广播消息将发送到路由器下所有的routee

一个路由actor

一个路由器也可以被创建为一个自包含的actor,来管理routee,载入路由逻辑和其他配置设置。

这种类型的路由actor有两种不同的模式:

  • 池——路由器创建routee作为子actor,并在该子actor终止时将它从路由器中移除。
  • 群组——routee actor在路由器外部创建,而路由器将通过使用actor选择将消息发送到指定路径,而不监控其终止。

路由actor可以通过在配置中或以编程方式被定义。虽然路由actor可以在配置文件中定义,但仍然必须以编程方式创建,即你不能只通过外部配置创建路由器。如果你在配置文件中定义路由actor,则实际将使用这些设置,而不是以编程方式提供的参数。

你可以通过路由actor向routee 发送消息,就像向普通actor发消息一样,即通过其ActorRef。路由actor转发消息给routee时不会更改原始发件人。当routee答复路由消息时,回复将发送到原始发件人,而不是路由actor。

注意

一般地,任何发送到路由器的消息将被向前发送到它的routee,但也有几个例外。这些记录在下面特殊处理消息一节中。

下面的代码和配置片段展示了如何创建一个将消息转发给五个Worker routee的轮循(round-robin)路由器。Routees 将被创建为路由器的子actor。

  1. akka.actor.deployment {
  2. /parent/router1 {
  3. router = round-robin-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router1: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router1")

这里是相同的例子,但路由配置是以编程方式而不是从配置获取。

  1. val router2: ActorRef =
  2. context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")
远程部署的Routee

除了能够创建本地actor作为routee,你可以指示路由器来部署其子actor到一系列远程主机上。Routee将以轮循方式被部署。若要远程部署routee,将路由配置包在一个RemoteRouterConfig下,附加要部署到的节点的远程地址。远程部署要求akka-remote模块被包含在类路径中。

  1. import akka.actor.{ Address, AddressFromURIString }
  2. import akka.remote.routing.RemoteRouterConfig
  3. val addresses = Seq(
  4. Address("akka.tcp", "remotesys", "otherhost", 1234),
  5. AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
  6. val routerRemote = system.actorOf(
  7. RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))
发送者

默认情况下,当一个routee发送一条消息,它将隐式地设置自身为发送者

  1. sender() ! x // replies will go to this actor

然而,通常为routee将路由器设置为发送者更有用。例如,你可能想要将路由器设置为发件人,如果你想要隐藏在路由器后面routee的细节。下面的代码片段演示如何设置父路由器作为发送者。

  1. sender().tell("reply", context.parent) // replies will go back to parent
  2. sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)
监管

由池路由器创建的routee将成为路由器的孩子。因此,路由器也是子actor的监管者。

可以用该池的supervisorStrategy属性配置路由器actor的监管策略。如果没有提供配置,路由器的默认策略是”总是上溯”。这意味着错误都会向上传递给路由器的监管者进行处理。路由器的监管者将决定对任何错误该做什么。

请注意路由器的监管者将把错误视为路由器本身的错误。因此一个指令,用于停止或重新启动将导致路由器本身以停止或重新启动。此路由器,相应地,将导致它的孩子停止并重新启动。

应该提到的是路由器重新启动行为已被重写,以便重新启动时,仍重新创建这些孩子,并会在池中保留相同数量的actor。

这意味着如果你还没有指定路由器或其父节点中的supervisorStrategy,routee的失败会上升到路由器,并将在默认情况下重新启动路由器,从而将重新启动所有的routee(它使用上升,并在重新启动过程中不停止routee)。原因是为了使类似在子actor定义中添加.withRouter这样的默认行为,不会更改应用于子actor的监管策略。你可以在定义路由器时指定策略来避免低效。

设置策略是很容易完成的:

  1. val escalator = OneForOneStrategy() {
  2. case e testActor ! e; SupervisorStrategy.Escalate
  3. }
  4. val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
  5. routeeProps = Props[TestActor]))

注意

如果路由器池的子actor终止,池路由器不会自动产生一个新的actor。在池路由器所有子actor都终止的事件中,路由器将终止本身,除非它是一个动态的路由器,例如使用了大小调整。

群组

有时候,相比于由路由actor创建其routee,我们更希望单独创建 routee,并提供路由器供其使用。你可以通过将 routee路径传递给路由器的配置来实现。消息将通过ActorSelection发送到这些路径。

下面的示例演示如何通过提供三个routee actor的路径字符串来创建一个路由器。

  1. akka.actor.deployment {
  2. /parent/router3 {
  3. router = round-robin-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router3: ActorRef =
  2. context.actorOf(FromConfig.props(), "router3")

这里是相同的例子,但路由配置是以编程方式而不是从配置获取。

  1. val router4: ActorRef =
  2. context.actorOf(RoundRobinGroup(paths).props(), "router4")

Routee actor将在路由器外部被创建:

  1. system.actorOf(Props[Workers], "workers")
  1. class Workers extends Actor {
  2. context.actorOf(Props[Worker], name = "w1")
  3. context.actorOf(Props[Worker], name = "w2")
  4. context.actorOf(Props[Worker], name = "w3")
  5. // ...

在路径中可能包含为actor在远程主机上运行的协议和地址信息。远程部署要求akka-remote模块被包含在类路径中。

路由器使用

在本节中,我们将描述如何创建不同类型的路由actor。

本节中的路由actor将由名为parent的顶级actor创建。请注意在配置中的部署路径以/parent/开头,并紧接着路由actor的名字。

  1. system.actorOf(Props[Parent], "parent")

RoundRobinPool 和 RoundRobinGroup

对其routee使用轮循机制(round-robin)轮询。

在配置中定义的RoundRobinPool:

  1. akka.actor.deployment {
  2. /parent/router1 {
  3. router = round-robin-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router1: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router1")

在代码中定义的RoundRobinPool:

  1. val router2: ActorRef =
  2. context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

在配置中定义的RoundRobinGroup:

  1. akka.actor.deployment {
  2. /parent/router3 {
  3. router = round-robin-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router3: ActorRef =
  2. context.actorOf(FromConfig.props(), "router3")

在代码中定义的RoundRobinGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router4: ActorRef =
  3. context.actorOf(RoundRobinGroup(paths).props(), "router4")
RandomPool 和 RandomGroup

该路由器类型会对每一条消息随机选择其routee。

在配置中定义的RandomPool:

  1. akka.actor.deployment {
  2. /parent/router5 {
  3. router = random-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router5: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router5")

在代码中定义的RandomPool:

  1. val router6: ActorRef =
  2. context.actorOf(RandomPool(5).props(Props[Worker]), "router6")

在配置中定义的RandomGroup:

  1. akka.actor.deployment {
  2. /parent/router7 {
  3. router = random-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router7: ActorRef =
  2. context.actorOf(FromConfig.props(), "router7")

在代码中定义的RandomGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router8: ActorRef =
  3. context.actorOf(RandomGroup(paths).props(), "router8")

BalancingPool

将尝试重新从繁忙routee分配任务到空闲routee的路由器。所有routee都共享同一个邮箱。

在配置中定义的BalancingPool:

  1. akka.actor.deployment {
  2. /parent/router9 {
  3. router = balancing-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router9: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router9")

在代码中定义的BalancingPool:

  1. val router10: ActorRef =
  2. context.actorOf(BalancingPool(5).props(Props[Worker]), "router10")

平衡调度器的额外配置,被池使用,可以通过路由部署配置的pool-dispatcher节进行设定。

  1. akka.actor.deployment {
  2. /parent/router9b {
  3. router = balancing-pool
  4. nr-of-instances = 5
  5. pool-dispatcher {
  6. attempt-teamwork = off
  7. }
  8. }
  9. }

对BalancingPool没有群组变体。

SmallestMailboxPool

试图向邮箱中有最少消息的非暂停子routee发送消息的路由器。按此顺序进行选择:

  • 挑选有空邮箱的空闲routee(即没有处理消息)
  • 选择任一空邮箱routee
  • 选择邮箱中有最少挂起消息的routee
  • 选择任一远程routee,远程actor考虑优先级最低,因为其邮箱大小未知

在配置中定义的SmallestMailboxPool:

  1. akka.actor.deployment {
  2. /parent/router11 {
  3. router = smallest-mailbox-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router11: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router11")

在代码中定义的SmallestMailboxPool:

  1. val router12: ActorRef =
  2. context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")

SmallestMailboxPool没有群组变体,因为邮箱大小和actor的内部调度状态从routee的路径看实际上是不可用的。

BroadcastPool 和 BroadcastGroup

广播的路由器将接收到的消息转发到它所有的routee。

在配置中定义的BroadcastPool:

  1. akka.actor.deployment {
  2. /parent/router13 {
  3. router = broadcast-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router13: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router13")

在代码中定义的BroadcastPool:

  1. val router14: ActorRef =
  2. context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14")

在配置中定义的BroadcastGroup:

  1. akka.actor.deployment {
  2. /parent/router15 {
  3. router = broadcast-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router15: ActorRef =
  2. context.actorOf(FromConfig.props(), "router15")

在代码中定义的BroadcastGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router16: ActorRef =
  3. context.actorOf(BroadcastGroup(paths).props(), "router16")

注意

Broadcast路由器总是向其routee广播每一条消息。如果你不想播出每条消息,则你可以使用非广播路由器并使用所需的广播消息

ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup

ScatterGatherFirstCompletedRouter将会把消息发送到它所有的routee。然后它等待直到收到第一个答复。该结果将发送回原始发送者。其他的答复将被丢弃。

在配置的时间内,它期待至少一个答复,否则它将回复一个包含akka.pattern.AskTimeoutExceptionakka.actor.Status.Failure

在配置中定义的ScatterGatherFirstCompletedPool:

  1. akka.actor.deployment {
  2. /parent/router17 {
  3. router = scatter-gather-pool
  4. nr-of-instances = 5
  5. within = 10 seconds
  6. }
  7. }
  1. val router17: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router17")

在代码中定义的ScatterGatherFirstCompletedPool:

  1. val router18: ActorRef =
  2. context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).
  3. props(Props[Worker]), "router18")

在配置中定义的ScatterGatherFirstCompletedGroup:

  1. akka.actor.deployment {
  2. /parent/router19 {
  3. router = scatter-gather-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. within = 10 seconds
  6. }
  7. }
  1. val router19: ActorRef =
  2. context.actorOf(FromConfig.props(), "router19")

在代码中定义的ScatterGatherFirstCompletedGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router20: ActorRef =
  3. context.actorOf(ScatterGatherFirstCompletedGroup(paths,
  4. within = 10.seconds).props(), "router20")
TailChoppingPool 和 TailChoppingGroup

TailChoppingRouter 将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。

此路由器的目标是通过查询到多个routee来减少延迟,假设其他的actor之一仍可能比第一个actor更快响应。

Peter Bailis很好地在一篇博客文章中描述了这个优化: 做冗余的工作,以加快分布式查询

在配置中定义的TailChoppingPool:

  1. akka.actor.deployment {
  2. /parent/router21 {
  3. router = tail-chopping-pool
  4. nr-of-instances = 5
  5. within = 10 seconds
  6. tail-chopping-router.interval = 20 milliseconds
  7. }
  8. }
  1. val router21: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router21")

在代码中定义的TailChoppingPool:

  1. val router22: ActorRef =
  2. context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis).
  3. props(Props[Worker]), "router22")

在配置中定义的TailChoppingGroup:

  1. akka.actor.deployment {
  2. /parent/router23 {
  3. router = tail-chopping-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. within = 10 seconds
  6. tail-chopping-router.interval = 20 milliseconds
  7. }
  8. }
  1. val router23: ActorRef =
  2. context.actorOf(FromConfig.props(), "router23")

在代码中定义的TailChoppingGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router24: ActorRef =
  3. context.actorOf(TailChoppingGroup(paths,
  4. within = 10.seconds, interval = 20.millis).props(), "router24")
ConsistentHashingPool 和 ConsistentHashingGroup

ConsistentHashingPool基于已发送的消息使用一致性哈希(consistent hashing)选择routee。这篇文章给出了如何实现一致性哈希非常好的见解。

有三种方式来定义使用哪些数据作为一致的散列键。

  • 你可以定义路由的hashMapping,将传入的消息映射到它们一致散列键。这使决策对发送者透明。
  • 这些消息可能会实现akka.routing.ConsistentHashingRouter.ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。
  • 消息可以被包装在一个akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。

这些定义一致性哈希键的方法,可以同时对一个路由器在一起使用。hashMapping被第一个尝试。

代码示例:

  1. import akka.actor.Actor
  2. import akka.routing.ConsistentHashingRouter.ConsistentHashable
  3. class Cache extends Actor {
  4. var cache = Map.empty[String, String]
  5. def receive = {
  6. case Entry(key, value) => cache += (key -> value)
  7. case Get(key) => sender() ! cache.get(key)
  8. case Evict(key) => cache -= key
  9. }
  10. }
  11. case class Evict(key: String)
  12. case class Get(key: String) extends ConsistentHashable {
  13. override def consistentHashKey: Any = key
  14. }
  15. case class Entry(key: String, value: String)
  1. import akka.actor.Props
  2. import akka.routing.ConsistentHashingPool
  3. import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
  4. import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
  5. def hashMapping: ConsistentHashMapping = {
  6. case Evict(key) => key
  7. }
  8. val cache: ActorRef =
  9. context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).
  10. props(Props[Cache]), name = "cache")
  11. cache ! ConsistentHashableEnvelope(
  12. message = Entry("hello", "HELLO"), hashKey = "hello")
  13. cache ! ConsistentHashableEnvelope(
  14. message = Entry("hi", "HI"), hashKey = "hi")
  15. cache ! Get("hello")
  16. expectMsg(Some("HELLO"))
  17. cache ! Get("hi")
  18. expectMsg(Some("HI"))
  19. cache ! Evict("hi")
  20. cache ! Get("hi")
  21. expectMsg(None)

在上面的例子中可以看到Get消息自己实现了ConsistentHashable,而Entry消息包裹在一个ConsistentHashableEnvelope中。Evict消息由hashMapping偏函数处理。

在配置中定义的ConsistentHashingPool:

  1. akka.actor.deployment {
  2. /parent/router25 {
  3. router = consistent-hashing-pool
  4. nr-of-instances = 5
  5. virtual-nodes-factor = 10
  6. }
  7. }
  1. val router25: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router25")

在代码中定义的ConsistentHashingPool:

  1. val router26: ActorRef =
  2. context.actorOf(ConsistentHashingPool(5).props(Props[Worker]),
  3. "router26")

在配置中定义的ConsistentHashingGroup:

  1. akka.actor.deployment {
  2. /parent/router27 {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. virtual-nodes-factor = 10
  6. }
  7. }
  1. val router27: ActorRef =
  2. context.actorOf(FromConfig.props(), "router27")

在代码中定义的ConsistentHashingGroup:

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router28: ActorRef =
  3. context.actorOf(ConsistentHashingGroup(paths).props(), "router28")

virtual-nodes-factor(虚拟节点因子)是每个routee的虚拟节点数,用来在一致性哈希节点环中使用,使分布更加均匀。

特殊处理的消息

发送到路由actor的大多数消息将根据路由器的路由逻辑进行转发。然而,有几种具有特殊行为的消息类型。

请注意这些特别的消息,除了Broadcast消息之外,只被自我包含的路由actor处理,而不是简单的路由器中描述的akka.routing.Router组件。

广播消息

Broadcast消息可用于向路由器的所有routee发送一条消息。当路由器接收一个Broadcast消息时,它将把消息的有效载荷发给所有的routee,不管该路由器通常如何路由其消息。

下面的示例演示了如何使用Broadcast消息向路由器下的每个routee发送一个非常重要的信息。

  1. import akka.routing.Broadcast
  2. router ! Broadcast("Watch out for Davy Jones' locker")

在此示例中路由器接收的广播消息,提取其有效载荷("Watch out for Davy Jones' locker"),然后发送到有效载荷路由器的所有routee。它是由每个 routee actor来处理接收到的有效负载消息。

PoisonPill消息

PoisonPill消息对所有actor,包括路由actor,都有特殊的处理。当任何一个actor收到PoisonPill消息时,该actor将被停止。有关详细信息请参见PoisonPill

  1. import akka.actor.PoisonPill
  2. router ! PoisonPill

路由器通常将消息传递给routee,但对于PoisonPill消息,需要认识到的很重要一点是它只被路由器处理。发送到路由器的PoisonPill消息将不会发送到routee。

然而,发送到路由器的PoisonPill消息可能仍会影响其routee,因为路由器停止时它也会停止其子actor。停止子actor是普通的actor行为。路由器将会停止其作为子actor创建的各个routee。每个孩子将处理其当前的消息,然后停止。这可能会导致一些消息未被处理。停止actor的详细信息,请参阅文档

如果你希望停止路由器及其routee,但你希望routee在停止前先处理目前在其邮箱中的所有消息,则你不应该发送PoisonPill消息。相反,你应该将PoisonPill包装在一个Broadcast,以便每个routee都能收到PoisonPill消息。请注意这将停止所有的routee,即使routee不是路由器的孩子,也就是即使是通过编程方式提供给router的routee。

  1. import akka.actor.PoisonPill
  2. import akka.routing.Broadcast
  3. router ! Broadcast(PoisonPill)

如上代码所示,每个routee将收到一个PoisonPill消息。每个routee会继续如常处理其邮件,最终处理PoisonPill。这将导致routee停止。所有routee停止后,路由器将自动停止自己,除非它是一个动态的路由器,例如尺寸调整器。

注意

Brendan W McAdams的优秀博客文章“分布化Akka工作负载——以及完成后的关闭”更详细地讨论了如何使用PoisonPill消息来关闭路由器和routee。

Kill消息

Kill消息是另一种需要特殊处理的消息类型。请参阅“如何杀掉一个actor”来获取actor如何处理Kill消息的一般信息。

Kill消息被发送到路由器,路由器将内部处理该消息,并且不会将它发送到其routee。路由器将抛出ActorKilledException并失败。然后它将被恢复、 重新启动或终止,取决于它如何被监督。

路由器的子routee亦将暂停,并将受应用在路由器上的监管指令影响。对不是路由器的孩子的Routee,即那些在路由器外部被创建的,将不受影响。

  1. import akka.actor.Kill
  2. router ! Kill

相比于PoisonPill消息,杀死一个路由器,间接杀死其子(即那些routee),和直接杀死routee(其中有些未必是其孩子)之间是有明显区别的。要直接杀死routee,路由器应发送包裹着Kill消息的Broadcast消息。

  1. import akka.actor.Kill
  2. import akka.routing.Broadcast
  3. router ! Broadcast(Kill)
Managagement消息
  • 发送akka.routing.GetRoutees到一个路由actor,使其回送一个包含当前使用routee的akka.routing.Routees消息。
  • 发送akka.routing.AddRoutee到一个路由actor会将那个routee添加到其routee集合中。
  • 发送akka.routing.RemoveRoutee到一个路由actor将从其routee集合删除该routee。
  • 发送akka.routing.AdjustPoolSize到一个池路由actor将从其routee集合中添加或删除该数目的routee。

这些管理消息可能晚于其他消息处理,所以如果你发送AddRoutee后立即发送普通消息,并不能保证当普通消息被路由时,routee已被更改。如果你需要知道更改何时生效,你可以发送AddRoutee紧跟着GetRoutees,当你收到Routees答复,你就知道前面的变化已被应用。

动态改变大小的池

大多数池可以使用固定数量的routee或有一个调整策略来动态调整routee数。

在配置中定义的包含resizer的池:

  1. akka.actor.deployment {
  2. /parent/router29 {
  3. router = round-robin-pool
  4. resizer {
  5. lower-bound = 2
  6. upper-bound = 15
  7. messages-per-resize = 100
  8. }
  9. }
  10. }
  1. val router29: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router29")

更多选项在配置akka.actor.deployment.default.resizer部分中有描述。

在代码中定义的包含resizer的池:

  1. val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
  2. val router30: ActorRef =
  3. context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]),
  4. "router30")

需要指出如果你在配置文件中定义了router,那么这个值将比在代码中传入的参数有更高的优先级。

注意

改变大小的行为是通过向actor池发送消息来触发的,但它不是完全同步的;而是向RouterActor的“head”发送消息来执行修改。所以在别的actor忙碌时,你不能假设改变大小的操作会立即创建新的工作actor,因为消息会被发到忙碌actor的邮箱中排队。要解决这个问题,配置actor池使用一个平衡的派发器,更多信息见Configuring Dispatchers

Akka中的路由是如何设计的

从表面看,路由器就像普通的actor,但是它们实际实现是不同的。路由器在收消息和快速发消息给routee被设计的极度优化。

一个普通的actor可以用来路由消息,但是actor的单线程处理会成为一个瓶颈。路由器可以通过优化原有消息处理pipeline来支持多线程,从而达到更高的吞吐量。这里是通过直接嵌入路由逻辑到其ActorRef,而不是在路由actor本身。发送到路由器ActorRef的消息可以直接被路由到routee,从而完全跳过单线程的路由actor。

当然,这个改进的成本是路由代码的内部构造相比于使用普通actor构造来说复杂许多。幸运的是所有这种复杂性对于路由API消费者来说是不可见的。然而,这却是你在实现自己的路由器时需要意识到的。

自定义路由actor

如果觉得Akka自带的路由actor都不合用,你也可以创建自己的路由actor。要创建自己的路由,你需要满足本节中所列出的条件。

在创建你自己的路由器之前,你应该考虑一个拥有类似路由器行为的普通actor是否能完成一个成熟路由器的功能。正如上文解释,路由器相比于普通actor主要好处是他们拥有更高的性能。但相比普通actor他们的代码也更为复杂。因此,如果在你的应用程序中较低的最大吞吐量是可以接受的,则不妨继续使用传统的actor。不过这一节假定你想要获得最大性能,并因而演示如何创建你自己的路由器。

在此示例中创建的路由器将把每个消息复制到几个目的地。

首先从路由逻辑开始:

  1. import scala.collection.immutable
  2. import scala.concurrent.forkjoin.ThreadLocalRandom
  3. import akka.routing.RoundRobinRoutingLogic
  4. import akka.routing.RoutingLogic
  5. import akka.routing.Routee
  6. import akka.routing.SeveralRoutees
  7. class RedundancyRoutingLogic(nbrCopies: Int) extends RoutingLogic {
  8. val roundRobin = RoundRobinRoutingLogic()
  9. def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = {
  10. val targets = (1 to nbrCopies).map(_ => roundRobin.select(message, routees))
  11. SeveralRoutees(targets)
  12. }
  13. }

在这个例子中select将被每个消息调用来使用轮询来挑选几个目的地,通过重用现有的RoundRobinRoutingLogic并将结果包装在一个SeveralRoutees实例中。SeveralRoutees将会把消息发送给所有提供的routee。

路由逻辑的实现必须是线程安全的,因为它可能在actor外被使用。

路由逻辑的一个单元测试:

  1. case class TestRoutee(n: Int) extends Routee {
  2. override def send(message: Any, sender: ActorRef): Unit = ()
  3. }
  4. val logic = new RedundancyRoutingLogic(nbrCopies = 3)
  5. val routees = for (n <- 1 to 7) yield TestRoutee(n)
  6. val r1 = logic.select("msg", routees)
  7. r1.asInstanceOf[SeveralRoutees].routees should be(
  8. Vector(TestRoutee(1), TestRoutee(2), TestRoutee(3)))
  9. val r2 = logic.select("msg", routees)
  10. r2.asInstanceOf[SeveralRoutees].routees should be(
  11. Vector(TestRoutee(4), TestRoutee(5), TestRoutee(6)))
  12. val r3 = logic.select("msg", routees)
  13. r3.asInstanceOf[SeveralRoutees].routees should be(
  14. Vector(TestRoutee(7), TestRoutee(1), TestRoutee(2)))

你可以停在这儿,通过akka.routing.Router使用 RedundancyRoutingLogic,如一个简单路由器中所述。

让我们继续,并使之成为一个自包含的、可配置的路由器actor。

创建一个类来扩展PoolGroupCustomRouterConfig。该类是一个路由逻辑的工厂,并持有路由器的配置。在这里,我们把它变成一个Group

  1. import akka.dispatch.Dispatchers
  2. import akka.routing.Group
  3. import akka.routing.Router
  4. import akka.japi.Util.immutableSeq
  5. import com.typesafe.config.Config
  6. case class RedundancyGroup(override val paths: immutable.Iterable[String], nbrCopies: Int) extends Group {
  7. def this(config: Config) = this(
  8. paths = immutableSeq(config.getStringList("routees.paths")),
  9. nbrCopies = config.getInt("nbr-copies"))
  10. override def createRouter(system: ActorSystem): Router =
  11. new Router(new RedundancyRoutingLogic(nbrCopies))
  12. override val routerDispatcher: String = Dispatchers.DefaultDispatcherId
  13. }

这样就可以像Akka提供的路由actor完全一样使用。

  1. for (n <- 1 to 10) system.actorOf(Props[Storage], "s" + n)
  2. val paths = for (n <- 1 to 10) yield ("/user/s" + n)
  3. val redundancy1: ActorRef =
  4. system.actorOf(RedundancyGroup(paths, nbrCopies = 3).props(),
  5. name = "redundancy1")
  6. redundancy1 ! "important"

请注意我们在RedundancyGroup添加一个以Config为参数的构造函数。这样一来,我们就可能在配置中定义。

  1. akka.actor.deployment {
  2. /redundancy2 {
  3. router = "docs.routing.RedundancyGroup"
  4. routees.paths = ["/user/s1", "/user/s2", "/user/s3"]
  5. nbr-copies = 5
  6. }
  7. }

请注意router属性中的类的全名。路由器类必须继承akka.routing.RouterConfigPoolGroupCustomRouterConfig),并且有一个以com.typesafe.config.Config为参数的构造函数。配置的部署部分将被传递给构造函数。

  1. val redundancy2: ActorRef = system.actorOf(FromConfig.props(),
  2. name = "redundancy2")
  3. redundancy2 ! "very important"

配置调度器" class="reference-link">配置调度器

创建子actor池的调度器将取自Props,如调度器中所述。

为了可以很容易地定义池routee的调度器,你可以在配置的部署一节中定义内联调度器。

  1. akka.actor.deployment {
  2. /poolWithDispatcher {
  3. router = random-pool
  4. nr-of-instances = 5
  5. pool-dispatcher {
  6. fork-join-executor.parallelism-min = 5
  7. fork-join-executor.parallelism-max = 5
  8. }
  9. }
  10. }

这是启用一个池专用调度器,你唯一需要做的。

注意

如果你使用actor群,并路由到它们的路径,然后他们将仍然使用配置在其Props的相同的调度器,不可能在actor创建后改变actor的调度器。

“头”路由器不能总是在相同的调度器上运行,因为它不处理同一类型的消息,因此这个特殊的actor没有使用配置的调度器,但相反,使用RouterConfigrouterDispatcher,它是actor系统默认调度器默认的。所有标准路由器允许它们在构造函数或工厂方法中设置此属性,自定义路由器必须以适当的方式实现该方法。

  1. val router: ActorRef = system.actorOf(
  2. // “head” router actor will run on "router-dispatcher" dispatcher
  3. // Worker routees will run on "pool-dispatcher" dispatcher
  4. RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
  5. name = "poolWithDispatcher")

注意

不允许配置routerDispatcherakka.dispatch.BalancingDispatcherConfigurator,因为用于特殊路由actor的消息不能被任意其他actor处理。