Actor 生命周期

依赖

为了使用 Akka Actor 类型,你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-actor-typed_2.12</artifactId>
  5. <version>2.5.23</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.5.23'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.23"

创建 Actors

一个 Actor 可以创建或生成任意数量的子 Actor,而子 Actor 又可以生成自己的子 Actor,从而形成一个 Actor 层次。「ActorSystem」承载层次结构,并且在ActorSystem层次结构的顶部只能有一个根 Actor。一个子 Actor 的生命周期是与其父 Actor 联系在一起的,一个子 Actor 可以在任何时候停止自己或被停止,但永远不能比父 Actor 活得更久。

守护者 Actor

根 Actor,也称为守护者 Actor,与ActorSystem一起创建。发送到 Actor 系统的消息被定向到根 Actor。根 Actor 是由用于创建ActorSystem的行为定义的,在下面的示例中名为HelloWorldMain.main

  1. final ActorSystem<HelloWorldMain.Start> system =
  2. ActorSystem.create(HelloWorldMain.main, "hello");
  3. system.tell(new HelloWorldMain.Start("World"));
  4. system.tell(new HelloWorldMain.Start("Akka"));

繁衍子级

子 Actor 是由「ActorContext」的“繁殖”产生的。在下面的示例中,当根 Actor 启动时,它生成一个由行为HelloWorld.greeter描述的子 Actor。此外,当根 Actor 收到Start消息时,它将创建由行为HelloWorldBot.bot定义的子 Actor:

  1. public abstract static class HelloWorldMain {
  2. private HelloWorldMain() {}
  3. public static class Start {
  4. public final String name;
  5. public Start(String name) {
  6. this.name = name;
  7. }
  8. }
  9. public static final Behavior<Start> main =
  10. Behaviors.setup(
  11. context -> {
  12. final ActorRef<HelloWorld.Greet> greeter =
  13. context.spawn(HelloWorld.greeter, "greeter");
  14. return Behaviors.receiveMessage(
  15. message -> {
  16. ActorRef<HelloWorld.Greeted> replyTo =
  17. context.spawn(HelloWorldBot.bot(0, 3), message.name);
  18. greeter.tell(new HelloWorld.Greet(message.name, replyTo));
  19. return Behaviors.same();
  20. });
  21. });
  22. }

要在生成 Actor 时指定调度器,请使用「DispatcherSelector」。如果未指定,则 Actor 将使用默认调度器,有关详细信息,请参阅「默认调度器」。

  1. public static final Behavior<Start> main =
  2. Behaviors.setup(
  3. context -> {
  4. final String dispatcherPath = "akka.actor.default-blocking-io-dispatcher";
  5. Props props = DispatcherSelector.fromConfig(dispatcherPath);
  6. final ActorRef<HelloWorld.Greet> greeter =
  7. context.spawn(HelloWorld.greeter, "greeter", props);
  8. return Behaviors.receiveMessage(
  9. message -> {
  10. ActorRef<HelloWorld.Greeted> replyTo =
  11. context.spawn(HelloWorldBot.bot(0, 3), message.name);
  12. greeter.tell(new HelloWorld.Greet(message.name, replyTo));
  13. return Behaviors.same();
  14. });
  15. });

通过参阅「Actors」,以了解上述示例。

SpawnProtocol

守护者 Actor 应该负责初始化任务并创建应用程序的初始 Actor,但有时你可能希望从守护者 Actor 的外部生成新的 Actor。例如,每个 HTTP 请求创建一个 Actor。

这并不难你在行为中的实现,但是由于这是一个常见的模式,因此有一个预定义的消息协议和行为的实现。它可以用作ActorSystem的守护者 Actor,可能与Behaviors.setup结合使用以启动一些初始任务或 Actor。然后,可以通过调用SpawnProtocol.Spawn从外部启动子 Actor,生成到系统的 Actor 引用。使用ask时,这类似于ActorSystem.actorOf如何在非类型化 Actor 中使用,不同之处在于ActorRefCompletionStage返回。

守护者行为可定义为:

  1. import akka.actor.typed.Behavior;
  2. import akka.actor.typed.SpawnProtocol;
  3. import akka.actor.typed.javadsl.Behaviors;
  4. public abstract static class HelloWorldMain {
  5. private HelloWorldMain() {}
  6. public static final Behavior<SpawnProtocol> main =
  7. Behaviors.setup(
  8. context -> {
  9. // Start initial tasks
  10. // context.spawn(...)
  11. return SpawnProtocol.behavior();
  12. });
  13. }

ActorSystem可以用这种main行为来创建,并生成其他 Actor:

  1. import akka.actor.typed.ActorRef;
  2. import akka.actor.typed.ActorSystem;
  3. import akka.actor.typed.Props;
  4. import akka.actor.typed.javadsl.AskPattern;
  5. import akka.util.Timeout;
  6. final ActorSystem<SpawnProtocol> system = ActorSystem.create(HelloWorldMain.main, "hello");
  7. final Duration timeout = Duration.ofSeconds(3);
  8. CompletionStage<ActorRef<HelloWorld.Greet>> greeter =
  9. AskPattern.ask(
  10. system,
  11. replyTo ->
  12. new SpawnProtocol.Spawn<>(HelloWorld.greeter, "greeter", Props.empty(), replyTo),
  13. timeout,
  14. system.scheduler());
  15. Behavior<HelloWorld.Greeted> greetedBehavior =
  16. Behaviors.receive(
  17. (context, message) -> {
  18. context.getLog().info("Greeting for {} from {}", message.whom, message.from);
  19. return Behaviors.stopped();
  20. });
  21. CompletionStage<ActorRef<HelloWorld.Greeted>> greetedReplyTo =
  22. AskPattern.ask(
  23. system,
  24. replyTo -> new SpawnProtocol.Spawn<>(greetedBehavior, "", Props.empty(), replyTo),
  25. timeout,
  26. system.scheduler());
  27. greeter.whenComplete(
  28. (greeterRef, exc) -> {
  29. if (exc == null) {
  30. greetedReplyTo.whenComplete(
  31. (greetedReplyToRef, exc2) -> {
  32. if (exc2 == null) {
  33. greeterRef.tell(new HelloWorld.Greet("Akka", greetedReplyToRef));
  34. }
  35. });
  36. }
  37. });

还可以在 Actor 层次结构的其他位置使用SpawnProtocol。它不必是根守护者 Actor。

停止 Actors

Actor 可以通过返回Behaviors.stopped作为下一个行为来停止自己。

通过使用父 Actor 的ActorContextstop方法,在子 Actor 完成当前消息的处理后,可以强制停止子 Actor。只有子 Actor 才能这样被阻止。

子 Actor 将作为父级关闭过程的一部分被停止。

停止 Actor 产生的PostStop信号可用于清除资源。请注意,可以选择将处理此类PostStop信号的行为定义为Behaviors.stopped的参数。如果 Actor 在突然停止时能优雅地停止自己,则需要不同的操作。

下面是一个示例:

  1. import java.util.concurrent.TimeUnit;
  2. import akka.actor.typed.ActorSystem;
  3. import akka.actor.typed.Behavior;
  4. import akka.actor.typed.PostStop;
  5. import akka.actor.typed.javadsl.Behaviors;
  6. public abstract static class JobControl {
  7. // no instances of this class, it's only a name space for messages
  8. // and static methods
  9. private JobControl() {}
  10. static interface JobControlLanguage {}
  11. public static final class SpawnJob implements JobControlLanguage {
  12. public final String name;
  13. public SpawnJob(String name) {
  14. this.name = name;
  15. }
  16. }
  17. public static final class GracefulShutdown implements JobControlLanguage {
  18. public GracefulShutdown() {}
  19. }
  20. public static final Behavior<JobControlLanguage> mcpa =
  21. Behaviors.receive(JobControlLanguage.class)
  22. .onMessage(
  23. SpawnJob.class,
  24. (context, message) -> {
  25. context.getSystem().log().info("Spawning job {}!", message.name);
  26. context.spawn(Job.job(message.name), message.name);
  27. return Behaviors.same();
  28. })
  29. .onSignal(
  30. PostStop.class,
  31. (context, signal) -> {
  32. context.getSystem().log().info("Master Control Programme stopped");
  33. return Behaviors.same();
  34. })
  35. .onMessage(
  36. GracefulShutdown.class,
  37. (context, message) -> {
  38. context.getSystem().log().info("Initiating graceful shutdown...");
  39. // perform graceful stop, executing cleanup before final system termination
  40. // behavior executing cleanup is passed as a parameter to Actor.stopped
  41. return Behaviors.stopped(
  42. () -> {
  43. context.getSystem().log().info("Cleanup!");
  44. });
  45. })
  46. .onSignal(
  47. PostStop.class,
  48. (context, signal) -> {
  49. context.getSystem().log().info("Master Control Programme stopped");
  50. return Behaviors.same();
  51. })
  52. .build();
  53. }
  54. public static class Job {
  55. public static Behavior<JobControl.JobControlLanguage> job(String name) {
  56. return Behaviors.receiveSignal(
  57. (context, PostStop) -> {
  58. context.getSystem().log().info("Worker {} stopped", name);
  59. return Behaviors.same();
  60. });
  61. }
  62. }
  63. final ActorSystem<JobControl.JobControlLanguage> system =
  64. ActorSystem.create(JobControl.mcpa, "B6700");
  65. system.tell(new JobControl.SpawnJob("a"));
  66. system.tell(new JobControl.SpawnJob("b"));
  67. // sleep here to allow time for the new actors to be started
  68. Thread.sleep(100);
  69. system.tell(new JobControl.GracefulShutdown());
  70. system.getWhenTerminated().toCompletableFuture().get(3, TimeUnit.SECONDS);

英文原文链接Actor lifecycle.