

为了使用集群单例(Cluster Singleton),你必须在项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-cluster-tools_2.12</artifactId>
  5. <version>2.5.21</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-cluster-tools_2.12', version: '2.5.21'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.21"


对于某些用例,确保集群中某个类型的某个 Actor 恰好运行在某个位置是方便的,有时也是强制的。


  • 对特定的集群范围一致性决策或跨集群系统协调行动的单一责任点
  • 外部系统的单一入口点
  • 单主多工
  • 集中命名服务或路由逻辑


集群单例模式由akka.cluster.singleton.ClusterSingletonManager实现。它在所有集群节点或标记有特定角色的一组节点中管理一个单实例 Actor 实例。ClusterSingletonManager是一个 Actor,它应该在集群中的所有节点或具有指定角色的所有节点上尽早启动。实际的单例 Actor 是由最老节点上的ClusterSingletonManager通过从提供的Props创建子 Actor 来启动的。ClusterSingletonManager确保在任何时间点最多运行一个单实例。

单例 Actor 总是在具有指定角色的最老成员上运行。最老的成员由akka.cluster.Member#isOlderThan确定。从群集中删除该成员时,这可能会发生变化。请注意,在移交(hand-over)过程中,如果没有活动的单例,则将是一个很短的时间段。

当最老的节点由于诸如 JVM 崩溃、硬关闭或网络故障而无法访问时,集群故障检测器会注意到。然后将接管一个新的最老节点,并创建一个新的单例 Actor。对于这些故障场景,将不会有一个优雅的移交,但通过所有合理的方法阻止了多个活动的单例。对于其他情况,最终可以通过配置超时来解决。

你可以使用提供的akka.cluster.singleton.ClusterSingletonProxy访问单例 Actor,该代理将所有消息路由到单例的当前实例。代理将跟踪集群中最老的节点,并通过显式发送单例的actorSelectionakka.actor.Identify消息并等待其回复来解析单例的ActorRef。如果单例(singleton)在特定(可配置)时间内没有回复,则会定期执行此操作。考虑到实现,可能会有一段时间内ActorRef不可用,例如,当节点离开集群时。在这些情况下,代理将缓冲发送到单例的消息,然后在单例最终可用时传递它们。如果缓冲区已满,则当通过代理发送新消息时,ClusterSingletonProxy将删除旧消息。缓冲区的大小是可配置的,可以通过使用0的缓冲区大小来禁用它。

值得注意的是,由于这些 Actor 的分布式特性,消息总是会丢失。一如既往,额外的逻辑应该在单例(确认)和客户机(重试)Actor 中实现,以确保至少一次消息传递。




  • 集群单例可能很快成为性能瓶颈,
  • 你不能依赖集群单例不间断地可用,例如,当运行单例的节点死亡时,需要几秒钟的时间才能注意到这一点,并将单例迁移到另一个节点,
  • 在使用自动关闭(Automatic Downing)的集群中出现网络分裂的情况下(参见文档中的自「Auto Downing」),可能会发生孤立的集群并各自决定成为它们自己的单例,这意味着系统中可能有多个单例运行,但是这些集群无法发现它们(因为网络分裂)


  • 警告:不要将集群单例与自动关闭一起使用,因为它允许集群分裂为两个单独的集群,从而导致启动多个单例,每个单独的集群中都有一个单例!


假设我们需要一个到外部系统的单一入口点。从 JMS 队列接收消息的 Actor,严格要求只有一个 JMS 消费者才能确保消息按顺序处理。这也许不是人们想要如何设计事物,而是与外部系统集成时典型的现实场景。

在解释如何创建集群单例 Actor 之前,我们先定义将由单例使用的消息类。

  1. public class TestSingletonMessages {
  2. public static class UnregistrationOk {}
  3. public static class End {}
  4. public static class Ping {}
  5. public static class Pong {}
  6. public static class GetCurrent {}
  7. public static UnregistrationOk unregistrationOk() {
  8. return new UnregistrationOk();
  9. }
  10. public static End end() {
  11. return new End();
  12. }
  13. public static Ping ping() {
  14. return new Ping();
  15. }
  16. public static Pong pong() {
  17. return new Pong();
  18. }
  19. public static GetCurrent getCurrent() {
  20. return new GetCurrent();
  21. }
  22. }

在集群中的每个节点上,你需要启动ClusterSingletonManager并提供单例 Actor 的Props,在本例中是 JMS 队列消费者。

  1. final ClusterSingletonManagerSettings settings =
  2. ClusterSingletonManagerSettings.create(system).withRole("worker");
  3. system.actorOf(
  4. ClusterSingletonManager.props(
  5. Props.create(Consumer.class, () -> new Consumer(queue, testActor)),
  6. TestSingletonMessages.end(),
  7. settings),
  8. "consumer");


我们使用一个特定于应用程序的terminationMessage(即TestSingletonMessages.end()消息)来在实际停止单例 Actor 之前关闭资源。请注意,如果你只需要停止 Actor,PoisonPill是一个完美的terminationMessage

下面是这个示例中,单例 Actor 如何处理terminationMessage

  1. .match(End.class, message -> queue.tell(UnregisterConsumer.class, getSelf()))
  2. .match(
  3. UnregistrationOk.class,
  4. message -> {
  5. stoppedBeforeUnregistration = false;
  6. getContext().stop(getSelf());
  7. })
  8. .match(Ping.class, message -> getSender().tell(TestSingletonMessages.pong(), getSelf()))


  1. ClusterSingletonProxySettings proxySettings =
  2. ClusterSingletonProxySettings.create(system).withRole("worker");
  3. ActorRef proxy =
  4. system.actorOf(
  5. ClusterSingletonProxy.props("/user/consumer", proxySettings), "consumerProxy");

在「Distributed workers with Akka and Java」中,有一个更全面的示例!



  1. akka.cluster.singleton {
  2. # The actor name of the child singleton actor.
  3. singleton-name = "singleton"
  4. # Singleton among the nodes tagged with specified role.
  5. # If the role is not specified it's a singleton among all nodes in the cluster.
  6. role = ""
  7. # When a node is becoming oldest it sends hand-over request to previous oldest,
  8. # that might be leaving the cluster. This is retried with this interval until
  9. # the previous oldest confirms that the hand over has started or the previous
  10. # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
  11. hand-over-retry-interval = 1s
  12. # The number of retries are derived from hand-over-retry-interval and
  13. # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
  14. # but it will never be less than this property.
  15. # After the hand over retries and it's still not able to exchange the hand over messages
  16. # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck,
  17. # to start from a clean state. After that it will still not start the singleton instance
  18. # until the previous oldest node has been removed from the cluster.
  19. # On the other side, on the previous oldest node, the same number of retries - 3 are used
  20. # and after that the singleton instance is stopped.
  21. # For large clusters it might be necessary to increase this to avoid too early timeouts while
  22. # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios
  23. # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios
  24. # the recovery might be faster.
  25. min-number-of-hand-over-retries = 15
  26. }


  1. akka.cluster.singleton-proxy {
  2. # The actor name of the singleton actor that is started by the ClusterSingletonManager
  3. singleton-name = ${akka.cluster.singleton.singleton-name}
  4. # The role of the cluster nodes where the singleton can be deployed.
  5. # If the role is not specified then any node will do.
  6. role = ""
  7. # Interval at which the proxy will try to resolve the singleton instance.
  8. singleton-identification-interval = 1s
  9. # If the location of the singleton is unknown the proxy will buffer this
  10. # number of messages and deliver them when the singleton is identified.
  11. # When the buffer is full old messages will be dropped when new messages are
  12. # sent via the proxy.
  13. # Use 0 to disable buffering, i.e. messages will be dropped immediately if
  14. # the location of the singleton is unknown.
  15. # Maximum allowed buffer size is 10000.
  16. buffer-size = 1000
  17. }


有两个 Actor 可能会受到监督。对于上面创建的消费者单例,这些将是:

  • 集群单例管理器,例如运行在集群中每个节点上的/user/consumer
  • 用户 Actor,例如/user/consumer/singleton,管理器从最老的节点开始。

集群单例管理器 Actor 不应该改变其监视策略,因为它应该一直在运行。但是,有时添加对用户 Actor 的监督是有用的。要完成此操作,请添加一个父监督者 Actor,该 Actor 将用于创建“真正”的单例实例。下面是一个示例实现(归功于这个「StackOverflow」答案)

  1. import akka.actor.AbstractActor;
  2. import akka.actor.AbstractActor.Receive;
  3. import akka.actor.ActorRef;
  4. import akka.actor.Props;
  5. import akka.actor.SupervisorStrategy;
  6. public class SupervisorActor extends AbstractActor {
  7. final Props childProps;
  8. final SupervisorStrategy supervisorStrategy;
  9. final ActorRef child;
  10. SupervisorActor(Props childProps, SupervisorStrategy supervisorStrategy) {
  11. this.childProps = childProps;
  12. this.supervisorStrategy = supervisorStrategy;
  13. this.child = getContext().actorOf(childProps, "supervised-child");
  14. }
  15. @Override
  16. public SupervisorStrategy supervisorStrategy() {
  17. return supervisorStrategy;
  18. }
  19. @Override
  20. public Receive createReceive() {
  21. return receiveBuilder().matchAny(msg -> child.forward(msg, getContext())).build();
  22. }
  23. }


  1. import akka.actor.PoisonPill;
  2. import akka.actor.Props;
  3. import akka.cluster.singleton.ClusterSingletonManager;
  4. import akka.cluster.singleton.ClusterSingletonManagerSettings;
  1. return getContext()
  2. .system()
  3. .actorOf(
  4. ClusterSingletonManager.props(
  5. Props.create(
  6. SupervisorActor.class, () -> new SupervisorActor(props, supervisorStrategy)),
  7. PoisonPill.getInstance(),
  8. ClusterSingletonManagerSettings.create(getContext().system())),
  9. name = name);

