如何:常见模式

本节列出了常用的 Actor 模式,这些模式被认为是有用的、优雅的或有指导意义的。任何内容都是受欢迎的,例如消息路由策略、监督模式、重启处理等。作为一个特殊的奖励,添加到本节的内容都标记了贡献者的名称,如果在他或她的代码中发现循环模式的每个 Akka 用户都可以为了所有人的利益分享它。在适用的情况下,添加到akka.pattern包中以创建「类似于 OTP 库」也是有意义的。

调度周期消息

详见「Actor Timers」。

具有高级错误报告的一次性 Actor 树

从 Java 进入 Actor 世界的一个好方法是使用Patterns.ask()。这个方法启动一个临时 Actor 来转发消息,并从 Actor 那里收集要“询问”的结果。如果所请求的 Actor 中出现错误,则将接管默认的监督处理。Patterns.ask()的调用方将不会收到通知。

如果调用者对这种异常感兴趣,他们必须确保被询问的 Actor 以Status.Failure(Throwable)进行答复。在被询问的 Actor 之后,可能会生成一个复杂的 Actor 层次结构来完成异步工作。然后监督是控制错误处理的既定方法。

不幸的是,被问到的 Actor 必须了解监督,并且必须捕获异常。这样的 Actor 不太可能在不同的 Actor 层次结构中重用,并且包含残缺的try/catch块。

此模式提供了一种将监督和错误传播封装到临时 Actor 的方法。最后,由Patterns.ask()返回的承诺作为一个失败来实现,包括异常,详见「 Java 8 兼容性」。

让我们看一下示例代码:

  1. /*
  2. * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
  3. */
  4. package jdocs.pattern;
  5. import java.util.concurrent.CompletionStage;
  6. import java.util.concurrent.TimeoutException;
  7. import java.time.Duration;
  8. import akka.actor.ActorKilledException;
  9. import akka.actor.ActorRef;
  10. import akka.actor.ActorRefFactory;
  11. import akka.actor.Cancellable;
  12. import akka.actor.OneForOneStrategy;
  13. import akka.actor.Props;
  14. import akka.actor.Scheduler;
  15. import akka.actor.Status;
  16. import akka.actor.SupervisorStrategy;
  17. import akka.actor.Terminated;
  18. import akka.actor.AbstractActor;
  19. import akka.pattern.Patterns;
  20. public class SupervisedAsk {
  21. private static class AskParam {
  22. Props props;
  23. Object message;
  24. Duration timeout;
  25. AskParam(Props props, Object message, Duration timeout) {
  26. this.props = props;
  27. this.message = message;
  28. this.timeout = timeout;
  29. }
  30. }
  31. private static class AskTimeout {}
  32. public static class AskSupervisorCreator extends AbstractActor {
  33. @Override
  34. public Receive createReceive() {
  35. return receiveBuilder()
  36. .match(
  37. AskParam.class,
  38. message -> {
  39. ActorRef supervisor = getContext().actorOf(Props.create(AskSupervisor.class));
  40. supervisor.forward(message, getContext());
  41. })
  42. .build();
  43. }
  44. }
  45. public static class AskSupervisor extends AbstractActor {
  46. private ActorRef targetActor;
  47. private ActorRef caller;
  48. private AskParam askParam;
  49. private Cancellable timeoutMessage;
  50. @Override
  51. public SupervisorStrategy supervisorStrategy() {
  52. return new OneForOneStrategy(
  53. 0,
  54. Duration.ZERO,
  55. cause -> {
  56. caller.tell(new Status.Failure(cause), getSelf());
  57. return SupervisorStrategy.stop();
  58. });
  59. }
  60. @Override
  61. public Receive createReceive() {
  62. return receiveBuilder()
  63. .match(
  64. AskParam.class,
  65. message -> {
  66. askParam = message;
  67. caller = getSender();
  68. targetActor = getContext().actorOf(askParam.props);
  69. getContext().watch(targetActor);
  70. targetActor.forward(askParam.message, getContext());
  71. Scheduler scheduler = getContext().getSystem().scheduler();
  72. timeoutMessage =
  73. scheduler.scheduleOnce(
  74. askParam.timeout,
  75. getSelf(),
  76. new AskTimeout(),
  77. getContext().getDispatcher(),
  78. null);
  79. })
  80. .match(
  81. Terminated.class,
  82. message -> {
  83. Throwable ex = new ActorKilledException("Target actor terminated.");
  84. caller.tell(new Status.Failure(ex), getSelf());
  85. timeoutMessage.cancel();
  86. getContext().stop(getSelf());
  87. })
  88. .match(
  89. AskTimeout.class,
  90. message -> {
  91. Throwable ex =
  92. new TimeoutException(
  93. "Target actor timed out after " + askParam.timeout.toString());
  94. caller.tell(new Status.Failure(ex), getSelf());
  95. getContext().stop(getSelf());
  96. })
  97. .build();
  98. }
  99. }
  100. public static CompletionStage<Object> askOf(
  101. ActorRef supervisorCreator, Props props, Object message, Duration timeout) {
  102. AskParam param = new AskParam(props, message, timeout);
  103. return Patterns.ask(supervisorCreator, param, timeout);
  104. }
  105. public static synchronized ActorRef createSupervisorCreator(ActorRefFactory factory) {
  106. return factory.actorOf(Props.create(AskSupervisorCreator.class));
  107. }
  108. }

askOf方法中,会向SupervisorCreator发送用户消息。SupervisorCreator创建一个SupervisorActor并转发消息。这可以防止由于 Actor 创建而导致 Actor 系统过载。监督者负责创建用户 Actor、转发消息、处理 Actor 终止和监督。此外,如果执行时间过期,则监管者将停止用户 Actor。

如果发生异常,监督者会通知临时 Actor 引发了哪个异常。之后,Actor 层次结构停止。

最后,我们能够执行一个 Actor 并接收结果或异常。

  1. /*
  2. * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
  3. */
  4. package jdocs.pattern;
  5. import akka.actor.ActorRef;
  6. import akka.actor.ActorRefFactory;
  7. import akka.actor.Props;
  8. import akka.actor.AbstractActor;
  9. import akka.util.Timeout;
  10. import scala.concurrent.duration.FiniteDuration;
  11. import java.time.Duration;
  12. import java.util.concurrent.CompletionStage;
  13. import java.util.concurrent.TimeUnit;
  14. public class SupervisedAskSpec {
  15. public Object execute(
  16. Class<? extends AbstractActor> someActor,
  17. Object message,
  18. Duration timeout,
  19. ActorRefFactory actorSystem)
  20. throws Exception {
  21. // example usage
  22. try {
  23. ActorRef supervisorCreator = SupervisedAsk.createSupervisorCreator(actorSystem);
  24. CompletionStage<Object> finished =
  25. SupervisedAsk.askOf(supervisorCreator, Props.create(someActor), message, timeout);
  26. return finished.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
  27. } catch (Exception e) {
  28. // exception propagated by supervision
  29. throw e;
  30. }
  31. }
  32. }

可扩展的分布式事件源和 CQRS

Lagom 框架」编码了将 Akka 持久性和 Akka 持久性查询与集群分片相结合的许多最佳实践,以构建具有事件源和 CQRS 的可扩展和弹性系统。

请参见 Lagom 文档中的「管理数据持久性」和「持久性实体」。


英文原文链接HowTo: Common Patterns.