

为了使用分布式发布订阅(Distributed Publish Subscribe),你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-cluster-tools_2.12</artifactId>
  5. <version>2.5.22</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-cluster-tools_2.12', version: '2.5.22'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.22"


在不知道 Actor 正在哪个节点运行的情况下,如何向其发送消息?

如何将消息发送给集群中对命名主题感兴趣的所有 Actor?

此模式提供了一个中介 Actor akka.cluster.pubsub.DistributedPubSubMediator,它管理 Actor 引用的注册表,并将条目复制到所有集群节点或标记有特定角色的一组节点中的同级 Actor。

DistributedPubSubMediator Actor 支持在集群中的所有节点或具有指定角色的所有节点上启动。中介程序可以以DistributedPubSub扩展启动,也可以作为普通的 Actor 启动。



你可以通过任何节点上的中介(mediator)向任何其他节点上注册的 Actor 发送消息。




Actor 注册到命名主题。这将在每个节点上启用许多订阅服务器。消息将传递给主题的所有订户。


你可以使用DistributedPubSubMediator.Subscribe将 Actor 注册到本地中介。成功的SubscribeUnsubscribe通过DistributedPubSubMediator.SubscribeAckDistributedPubSubMediator.UnsubscribeAck确认。确认意味着订阅已注册,但在复制到其他节点之前,它仍然需要一些时间。


当中介 Actor 停止时,Actor 将自动从注册表中删除,或者你也可以使用DistributedPubSubMediator.Unsubscribe显式删除条目。

订阅者 Actor 的示例:

  1. static class Subscriber extends AbstractActor {
  2. LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  3. public Subscriber() {
  4. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  5. // subscribe to the topic named "content"
  6. mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());
  7. }
  8. @Override
  9. public Receive createReceive() {
  10. return receiveBuilder()
  11. .match(String.class, msg -> log.info("Got: {}", msg))
  12. .match(DistributedPubSubMediator.SubscribeAck.class, msg -> log.info("subscribed"))
  13. .build();
  14. }
  15. }

订阅者 Actor 可以在集群中的多个节点上启动,所有节点都将接收发布到content主题的消息。

  1. system.actorOf(Props.create(Subscriber.class), "subscriber1");
  2. // another node
  3. system.actorOf(Props.create(Subscriber.class), "subscriber2");
  4. system.actorOf(Props.create(Subscriber.class), "subscriber3");

发布到此content主题的简单 Actor:

  1. static class Publisher extends AbstractActor {
  2. // activate the extension
  3. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  4. @Override
  5. public Receive createReceive() {
  6. return receiveBuilder()
  7. .match(
  8. String.class,
  9. in -> {
  10. String out = in.toUpperCase();
  11. mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
  12. })
  13. .build();
  14. }
  15. }


  1. // somewhere else
  2. ActorRef publisher = system.actorOf(Props.create(Publisher.class), "publisher");
  3. // after a while the subscriptions are replicated
  4. publisher.tell("hello", null);


Actor 还可以以group ID 订阅命名主题。如果订阅group ID,则通过提供的RoutingLogic(默认随机)将发布到主题的每条消息(sendOneMessageToEachGroup标志设置为true)传递给每个订阅组中的一个 Actor。

如果所有订阅的 Actor 都具有相同的组 ID,那么这就像Send一样工作,并且每个消息只传递到一个订阅者。

如果所有订阅的 Actor 都有不同的组名,那么这就像正常Publish一样工作,并且每个消息都广播给所有订阅者。

  • 注释:如果使用组 ID,它将是主题标识符的一部分。使用sendOneMessageToEachGroup=false发布的消息将不会传递给使用组 ID 订阅的订阅者。使用sendOneMessageToEachGroup=true发布的消息将不会传递给没有使用组 ID 订阅的订阅者。



如果注册表中存在匹配路径,则消息将传递给一个收件人。如果多个条目与路径匹配,因为它已在多个节点上注册,则消息将通过提供的路由逻辑(默认随机)发送到一个目标。消息的发送者可以指定首选本地路径,即消息被发送到与所使用的中介 Actor 相同的本地 Actor 系统中的 Actor(如果存在),否则路由到任何其他匹配条目。

你可以使用DistributedPubSubMediator.Put将 Actor 注册到本地中介(mediator)。Put中的ActorRef必须与中介属于同一个本地 Actor 系统。没有地址信息的路径是发送消息的关键(key)。在每个节点上,给定路径只能有一个 Actor,因为该路径在一个本地 Actor 系统中是唯一的。

使用目标 Actor 的路径(不含地址信息),你可以通过DistributedPubSubMediator.Send将消息发送到本地中介。

Actor 在终止时会自动从注册表中删除,或者你也可以使用DistributedPubSubMediator.Remove显式删除条目。

目标 Actor 的示例:

  1. static class Destination extends AbstractActor {
  2. LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  3. public Destination() {
  4. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  5. // register to the path
  6. mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
  7. }
  8. @Override
  9. public Receive createReceive() {
  10. return receiveBuilder().match(String.class, msg -> log.info("Got: {}", msg)).build();
  11. }
  12. }

目标 Actor 可以在集群中的多个节点上启动,并且所有节点都将接收发送到路径的消息(没有地址信息)。

  1. system.actorOf(Props.create(Destination.class), "destination");
  2. // another node
  3. system.actorOf(Props.create(Destination.class), "destination");

发送到路径的简单 Actor:

  1. static class Sender extends AbstractActor {
  2. // activate the extension
  3. ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
  4. @Override
  5. public Receive createReceive() {
  6. return receiveBuilder()
  7. .match(
  8. String.class,
  9. in -> {
  10. String out = in.toUpperCase();
  11. boolean localAffinity = true;
  12. mediator.tell(
  13. new DistributedPubSubMediator.Send("/user/destination", out, localAffinity),
  14. getSelf());
  15. })
  16. .build();
  17. }
  18. }


  1. // somewhere else
  2. ActorRef sender = system.actorOf(Props.create(Publisher.class), "sender");
  3. // after a while the destinations are replicated
  4. sender.tell("hello", null);

也可以将消息广播给已向Put注册的 Actor。将DistributedPubSubMediator.SendToAlll消息发送到本地中介,然后将包装好的消息传递到具有匹配路径的所有收件人。具有相同路径且没有地址信息的 Actor 可以在不同的节点上注册。在每个节点上只能有一个这样的 Actor,因为路径在一个本地 Actor 系统中是唯一的。

此模式的典型用法是将消息广播到具有相同路径的所有副本,例如,在所有执行相同操作的不同节点上的 3 个 Actor,以实现冗余。你还可以选择指定一个属性(allButSelf),决定是否应将消息发送到自节点上的匹配路径。

DistributedPubSub 扩展

在上面的示例中,使用akka.cluster.pubsub.DistributedPubSub扩展启动和访问中介。这在大多数情况下都是很方便和完美的,但是也可以将中间 Actor 作为普通的 Actor 来启动,并且你可以同时拥有几个不同的中介,以便能够将大量的actors/topics分配给不同的中介。例如,你可能希望对不同的中介使用不同的集群角色。


  1. # Settings for the DistributedPubSub extension
  2. akka.cluster.pub-sub {
  3. # Actor name of the mediator actor, /system/distributedPubSubMediator
  4. name = distributedPubSubMediator
  5. # Start the mediator on members tagged with this role.
  6. # All members are used if undefined or empty.
  7. role = ""
  8. # The routing logic to use for 'Send'
  9. # Possible values: random, round-robin, broadcast
  10. routing-logic = random
  11. # How often the DistributedPubSubMediator should send out gossip information
  12. gossip-interval = 1s
  13. # Removed entries are pruned after this duration
  14. removed-time-to-live = 120s
  15. # Maximum number of elements to transfer in one message when synchronizing the registries.
  16. # Next chunk will be transferred in next round of gossip.
  17. max-delta-elements = 3000
  18. # When a message is published to a topic with no subscribers send it to the dead letters.
  19. send-to-dead-letters-when-no-subscribers = on
  20. # The id of the dispatcher to use for DistributedPubSubMediator actors.
  21. # If not specified default dispatcher is used.
  22. # If specified you need to define the settings of the actual dispatcher.
  23. use-dispatcher = ""
  24. }

建议在 Actor 系统启动时通过在akka.extensions配置属性中定义它来加载扩展。否则,它将在第一次使用时激活,然后需要一段时间才能就位(populated.)。

  1. akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]


与 Akka 中的「 Message Delivery Reliability」一样,该模式下的消息传递保证为至多一次传递(at-most-once delivery)。换言之,信息可能会丢失。

如果你需要至少一次的传递保证,我们建议与「Kafka Akka Streams」集成。

英文原文链接Distributed Publish Subscribe in Cluster.