集群感知路由器

所有「routers」都可以知道集群中的成员节点,即部署新的路由(routees)或在集群中的节点上查找路由。当一个节点无法访问或离开集群时,该节点的路由将自动从路由器中注销。当新节点加入集群时,会根据配置向路由器添加额外的路由。当一个节点在不可访问之后再次可访问时,也会添加路由。

群集感知路由(Cluster aware routers)可以使用WeaklyUp状态的成员(如果启用该功能)。

有两种不同类型的路由器。

  • Group,使用 Actor selection将消息发送到指定路径的路由器:路由可以在群集中不同节点上运行的路由器之间共享。这种类型路由器的一个用例示例是运行在集群中某些后端节点上的服务,可由运行在集群中前端节点上的路由器使用。
  • Pool,将路由创建为子 Actor ,并将它们部署到远程节点上:每个路由器都有自己的路由实例。例如,如果在 10 节点群集中的 3 个节点上启动路由器,那么如果将路由器配置为每个节点使用一个实例,则总共有 30 个路由。不同路由器创建的路由不会在路由器之间共享。这种类型路由器的一个用例示例是一个单独的master,它协调作业并将实际工作委托给集群中其他节点上运行的路由。

依赖

为了使用集群感知路由器(Cluster Aware Routers),你必须在项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-cluster_2.12</artifactId>
  5. <version>2.5.21</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-cluster_2.12', version: '2.5.21'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.5.21"

组路由器

使用Group时,必须在集群成员节点上启动路由 Actor。这不是由路由器完成的。组的配置如下所示:

  1. akka.actor.deployment {
  2. /statsService/workerRouter {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/statsWorker"]
  5. cluster {
  6. enabled = on
  7. allow-local-routees = on
  8. use-roles = ["compute"]
  9. }
  10. }
  11. }
  • 注意:当启动 Actor 系统时,路由 Actor 应该尽早启动,因为一旦成员状态更改为Up,路由器就会尝试使用它们。

routees.paths中定义的 Actor 路径用于选择由路由器将消息转发到的 Actor。路径不应包含协议和地址信息,因为它们是从集群成员(membership)动态检索的。消息将使用「ActorSelection」转发到路由,因此应该使用相同的传递语义。通过指定use-roles,可以将对路由的查找限制到标记了特定角色集的成员节点。

max-total-nr-of-instances定义群集中的路由总数。默认情况下,max-total-nr-of-instances设置为高值(10000),当节点加入集群时,将导致新的路由添加到路由器。如果要限制路由总数,请将其设置为较低的值。

同样类型的路由器也可以在代码中定义:

  1. int totalInstances = 100;
  2. Iterable<String> routeesPaths = Collections.singletonList("/user/statsWorker");
  3. boolean allowLocalRoutees = true;
  4. Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
  5. ActorRef workerRouter =
  6. getContext()
  7. .actorOf(
  8. new ClusterRouterGroup(
  9. new ConsistentHashingGroup(routeesPaths),
  10. new ClusterRouterGroupSettings(
  11. totalInstances, routeesPaths, allowLocalRoutees, useRoles))
  12. .props(),
  13. "workerRouter2");

有关设置的详细说明,请参阅「参考配置」。

带路由组的路由器示例

让我们来看看如何将集群感知路由器与一组路由(即发送到路由器路径的路由)一起使用。

示例应用程序提供了一个计算文本统计信息的服务。当一些文本被发送到服务时,它将其拆分为单词,并将任务分配给一个单独的工作进程(路由器的一个路由),以计算每个单词中的字符数。每个字的字符数被发送回一个聚合器(aggregator),该聚合器在收集所有结果时计算每个字的平均字符数。

消息:

  1. public interface StatsMessages {
  2. public static class StatsJob implements Serializable {
  3. private final String text;
  4. public StatsJob(String text) {
  5. this.text = text;
  6. }
  7. public String getText() {
  8. return text;
  9. }
  10. }
  11. public static class StatsResult implements Serializable {
  12. private final double meanWordLength;
  13. public StatsResult(double meanWordLength) {
  14. this.meanWordLength = meanWordLength;
  15. }
  16. public double getMeanWordLength() {
  17. return meanWordLength;
  18. }
  19. @Override
  20. public String toString() {
  21. return "meanWordLength: " + meanWordLength;
  22. }
  23. }
  24. public static class JobFailed implements Serializable {
  25. private final String reason;
  26. public JobFailed(String reason) {
  27. this.reason = reason;
  28. }
  29. public String getReason() {
  30. return reason;
  31. }
  32. @Override
  33. public String toString() {
  34. return "JobFailed(" + reason + ")";
  35. }
  36. }
  37. }

计算每个字中字符数的工作者(worker):

  1. public class StatsWorker extends AbstractActor {
  2. Map<String, Integer> cache = new HashMap<String, Integer>();
  3. @Override
  4. public Receive createReceive() {
  5. return receiveBuilder()
  6. .match(
  7. String.class,
  8. word -> {
  9. Integer length = cache.get(word);
  10. if (length == null) {
  11. length = word.length();
  12. cache.put(word, length);
  13. }
  14. getSender().tell(length, getSelf());
  15. })
  16. .build();
  17. }
  18. }

从用户接收文本并将其拆分为单词、委派给workers和聚合(aggregates)的服务:

  1. public class StatsService extends AbstractActor {
  2. // This router is used both with lookup and deploy of routees. If you
  3. // have a router with only lookup of routees you can use Props.empty()
  4. // instead of Props.create(StatsWorker.class).
  5. ActorRef workerRouter =
  6. getContext()
  7. .actorOf(FromConfig.getInstance().props(Props.create(StatsWorker.class)), "workerRouter");
  8. @Override
  9. public Receive createReceive() {
  10. return receiveBuilder()
  11. .match(
  12. StatsJob.class,
  13. job -> !job.getText().isEmpty(),
  14. job -> {
  15. String[] words = job.getText().split(" ");
  16. ActorRef replyTo = getSender();
  17. // create actor that collects replies from workers
  18. ActorRef aggregator =
  19. getContext().actorOf(Props.create(StatsAggregator.class, words.length, replyTo));
  20. // send each word to a worker
  21. for (String word : words) {
  22. workerRouter.tell(new ConsistentHashableEnvelope(word, word), aggregator);
  23. }
  24. })
  25. .build();
  26. }
  27. }
  1. public class StatsAggregator extends AbstractActor {
  2. final int expectedResults;
  3. final ActorRef replyTo;
  4. final List<Integer> results = new ArrayList<Integer>();
  5. public StatsAggregator(int expectedResults, ActorRef replyTo) {
  6. this.expectedResults = expectedResults;
  7. this.replyTo = replyTo;
  8. }
  9. @Override
  10. public void preStart() {
  11. getContext().setReceiveTimeout(Duration.ofSeconds(3));
  12. }
  13. @Override
  14. public Receive createReceive() {
  15. return receiveBuilder()
  16. .match(
  17. Integer.class,
  18. wordCount -> {
  19. results.add(wordCount);
  20. if (results.size() == expectedResults) {
  21. int sum = 0;
  22. for (int c : results) {
  23. sum += c;
  24. }
  25. double meanWordLength = ((double) sum) / results.size();
  26. replyTo.tell(new StatsResult(meanWordLength), getSelf());
  27. getContext().stop(getSelf());
  28. }
  29. })
  30. .match(
  31. ReceiveTimeout.class,
  32. x -> {
  33. replyTo.tell(new JobFailed("Service unavailable, try again later"), getSelf());
  34. getContext().stop(getSelf());
  35. })
  36. .build();
  37. }
  38. }

注意,到目前为止还没有特定的集群,只是普通的 Actor。

所有节点都启动StatsServiceStatsWorker Actor。记住,在这种情况下,路由是worker。路由器配置了routees.paths

  1. akka.actor.deployment {
  2. /statsService/workerRouter {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/statsWorker"]
  5. cluster {
  6. enabled = on
  7. allow-local-routees = on
  8. use-roles = ["compute"]
  9. }
  10. }
  11. }

这意味着用户请求可以发送到任何节点上的StatsService,并且它将在所有节点上使用StatsWorker

最简单的运行路由器示例的方法是下载「Akka Cluster Sample with Java」,它包含有关如何使用路由组运行路由器示例的说明。此示例的源代码也可以在「Akka Samples Repository」中找到。

带有远程部署路由池的路由器

Pool与在群集成员节点上创建和部署的路由一起使用时,路由器的配置如下所示:

  1. akka.actor.deployment {
  2. /statsService/singleton/workerRouter {
  3. router = consistent-hashing-pool
  4. cluster {
  5. enabled = on
  6. max-nr-of-instances-per-node = 3
  7. allow-local-routees = on
  8. use-roles = ["compute"]
  9. }
  10. }
  11. }

可以通过指定use-roles将路由(routees)的部署限制到标记了特定角色集的成员节点。

max-total-nr-of-instances定义群集中的路由总数,但不会超过每个节点的路由数,max-nr-of-instances-per-node。默认情况下,max-total-nr-of-instances设置为高值(10000),当节点加入集群时,将导致新的路由添加到路由器。如果要限制路由总数,请将其设置为较低的值。

同样类型的路由器也可以在代码中定义:

  1. int totalInstances = 100;
  2. int maxInstancesPerNode = 3;
  3. boolean allowLocalRoutees = false;
  4. Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
  5. ActorRef workerRouter =
  6. getContext()
  7. .actorOf(
  8. new ClusterRouterPool(
  9. new ConsistentHashingPool(0),
  10. new ClusterRouterPoolSettings(
  11. totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles))
  12. .props(Props.create(StatsWorker.class)),
  13. "workerRouter3");

有关设置的详细说明,请参阅「参考配置」。

带有远程部署路由池的路由器示例

让我们看看如何在创建和部署workers的单个主节点(master node)上使用集群感知路由器。为了跟踪单个主节点,我们使用集群工具模块中的集群单例。ClusterSingletonManager在每个节点上启动:

  1. ClusterSingletonManagerSettings settings =
  2. ClusterSingletonManagerSettings.create(system).withRole("compute");
  3. system.actorOf(
  4. ClusterSingletonManager.props(
  5. Props.create(StatsService.class), PoisonPill.getInstance(), settings),
  6. "statsService");

我们还需要在每个节点上有一个 Actor,跟踪当前单个主节点的位置,并将作业委托给StatsService。由ClusterSingletonProxy提供:

  1. ClusterSingletonProxySettings proxySettings =
  2. ClusterSingletonProxySettings.create(system).withRole("compute");
  3. system.actorOf(
  4. ClusterSingletonProxy.props("/user/statsService", proxySettings), "statsServiceProxy");

ClusterSingletonProxy接收来自用户的文本,并将其委托给当前的StatsService(单主)。它监听集群事件以查找最老节点上的StatsService

所有节点都启动ClusterSingletonProxyClusterSingletonManager。路由器现在配置如下:

  1. akka.actor.deployment {
  2. /statsService/singleton/workerRouter {
  3. router = consistent-hashing-pool
  4. cluster {
  5. enabled = on
  6. max-nr-of-instances-per-node = 3
  7. allow-local-routees = on
  8. use-roles = ["compute"]
  9. }
  10. }
  11. }

最简单的运行带有远程部署路由池的路由器示例的方法是下载「Akka Cluster Sample with Java」,它包含有关如何使用远程部署路由池运行路由器示例的说明。此示例的源代码也可以在「Akka Samples Repository」中找到。


英文原文链接Cluster Aware Routers.