持久化

依赖

为了使用 Akka 持久化(Persistence)功能,你必须在项目中添加如下依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-persistence_2.12</artifactId>
  5. <version>2.5.20</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-persistence_2.12', version: '2.5.20'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.5.20"

Akka 持久性扩展附带了一些内置持久性插件,包括基于内存堆的日志、基于本地文件系统的快照存储和基于 LevelDB 的日志。

基于 LevelDB 的插件需要以下附加依赖:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>org.fusesource.leveldbjni</groupId>
  4. <artifactId>leveldbjni-all</artifactId>
  5. <version>1.8</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'org.fusesource.leveldbjni', name: 'leveldbjni-all', version: '1.8'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

示例项目

你可以查看「持久化示例」项目,以了解 Akka 持久化的实际使用情况。

简介

Akka 持久性使有状态的 Actor 能够持久化其状态,以便在 Actor 重新启动(例如,在 JVM 崩溃之后)、由监督者或手动停止启动或迁移到集群中时可以恢复状态。Akka 持久性背后的关键概念是,只有 Actor 接收到的事件才被持久化,而不是 Actor 的实际状态(尽管也提供了 Actor 状态快照支持)。事件通过附加到存储(没有任何变化)来持久化,这允许非常高的事务速率和高效的复制。有状态的 Actor 通过将存储的事件重放给 Actor 来恢复,从而允许它重建其状态。这可以是更改的完整历史记录,也可以从快照中的检查点开始,这样可以显著缩短恢复时间。Akka 持久化(persistence)还提供具有至少一次消息传递(at-least-once message delivery )语义的点对点(point-to-point)通信。

  • 注释:《通用数据保护条例》(GDPR)要求必须根据用户的请求删除个人信息。删除或修改携带个人信息的事件是困难的。数据分解可以用来忘记信息,而不是删除或修改信息。这是通过使用给定数据主体 ID(person)的密钥加密数据,并在忘记该数据主体时删除密钥来实现的。Lightbend 的「GDPR for Akka Persistence」提供了一些工具来帮助构建支持 GDPR 的系统。

Akka 持久化的灵感来自于「eventsourced」库的正式替换。它遵循与eventsourced相同的概念和体系结构,但在 API 和实现级别上存在显著差异。另请参见「migration-eventsourced-2.3」。

体系结构

  • AbstractPersistentActor:是一个持久的、有状态的 Actor。它能够将事件持久化到日志中,并能够以线程安全的方式对它们作出响应。它可以用于实现命令和事件源 Actor。当一个持久性 Actor 启动或重新启动时,日志消息将重播给该 Actor,以便它可以从这些消息中恢复其状态。
  • AbstractPersistentActorAtLeastOnceDelivery:将具有至少一次传递语义的消息发送到目的地,也可以在发送方和接收方 JVM 崩溃的情况下发送。
  • AsyncWriteJournal:日志存储发送给持久性 Actor 的消息序列。应用程序可以控制哪些消息是日志记录的,哪些消息是由持久性 Actor 接收的,而不进行日志记录。日志维护每一条消息上增加的highestSequenceNr。日志的存储后端是可插入的。持久性扩展附带了一个leveldb日志插件,它将写入本地文件系统。
  • 快照存储区(Snapshot store):快照存储区保存持久性 Actor 状态的快照。快照用于优化恢复时间。快照存储的存储后端是可插入的。持久性扩展附带了一个“本地”快照存储插件,该插件将写入本地文件系统。
  • 事件源(Event sourcing):基于上面描述的构建块,Akka 持久化为事件源应用程序的开发提供了抽象(详见「事件源」部分)。

事件源

请参阅「EventSourcing)」的介绍,下面是 Akka 通过持久性 Actor 实现的。

持久性 Actor 接收(非持久性)命令,如果该命令可以应用于当前状态,则首先对其进行验证。在这里,验证可以意味着任何事情,从简单检查命令消息的字段到与几个外部服务的对话。如果验证成功,则从命令生成事件,表示命令的效果。这些事件随后被持久化,并且在成功持久化之后,用于更改 Actor 的状态。当需要恢复持久性 Actor 时,只重播持久性事件,我们知道这些事件可以成功应用。换句话说,与命令相反,事件在被重播到持久性 Actor 时不会失败。事件源 Actor 还可以处理不更改应用程序状态的命令,例如查询命令。

关于“事件思考”的另一篇优秀文章是 Randy Shoup 的「Events As First-Class Citizens」。如果你开始开发基于事件的应用程序,这是一个简短的推荐阅读。

Akka 持久化使用AbstractPersistentActor抽象类支持事件源。扩展此类的 Actor 使用persist方法来持久化和处理事件。AbstractPersistentActor的行为是通过实现createReceiveRecovercreateReceive来定义的。这在下面的示例中进行了演示。

  1. import akka.actor.ActorRef;
  2. import akka.actor.ActorSystem;
  3. import akka.actor.Props;
  4. import akka.persistence.AbstractPersistentActor;
  5. import akka.persistence.SnapshotOffer;
  6. import java.io.Serializable;
  7. import java.util.ArrayList;
  8. class Cmd implements Serializable {
  9. private static final long serialVersionUID = 1L;
  10. private final String data;
  11. public Cmd(String data) {
  12. this.data = data;
  13. }
  14. public String getData() {
  15. return data;
  16. }
  17. }
  18. class Evt implements Serializable {
  19. private static final long serialVersionUID = 1L;
  20. private final String data;
  21. public Evt(String data) {
  22. this.data = data;
  23. }
  24. public String getData() {
  25. return data;
  26. }
  27. }
  28. class ExampleState implements Serializable {
  29. private static final long serialVersionUID = 1L;
  30. private final ArrayList<String> events;
  31. public ExampleState() {
  32. this(new ArrayList<>());
  33. }
  34. public ExampleState(ArrayList<String> events) {
  35. this.events = events;
  36. }
  37. public ExampleState copy() {
  38. return new ExampleState(new ArrayList<>(events));
  39. }
  40. public void update(Evt evt) {
  41. events.add(evt.getData());
  42. }
  43. public int size() {
  44. return events.size();
  45. }
  46. @Override
  47. public String toString() {
  48. return events.toString();
  49. }
  50. }
  51. class ExamplePersistentActor extends AbstractPersistentActor {
  52. private ExampleState state = new ExampleState();
  53. private int snapShotInterval = 1000;
  54. public int getNumEvents() {
  55. return state.size();
  56. }
  57. @Override
  58. public String persistenceId() {
  59. return "sample-id-1";
  60. }
  61. @Override
  62. public Receive createReceiveRecover() {
  63. return receiveBuilder()
  64. .match(Evt.class, state::update)
  65. .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
  66. .build();
  67. }
  68. @Override
  69. public Receive createReceive() {
  70. return receiveBuilder()
  71. .match(
  72. Cmd.class,
  73. c -> {
  74. final String data = c.getData();
  75. final Evt evt = new Evt(data + "-" + getNumEvents());
  76. persist(
  77. evt,
  78. (Evt e) -> {
  79. state.update(e);
  80. getContext().getSystem().getEventStream().publish(e);
  81. if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
  82. // IMPORTANT: create a copy of snapshot because ExampleState is mutable
  83. saveSnapshot(state.copy());
  84. });
  85. })
  86. .matchEquals("print", s -> System.out.println(state))
  87. .build();
  88. }
  89. }

该示例定义了两种数据类型,即CmdEvt,分别表示命令和事件。ExamplePersistentActor的状态是包含在ExampleState中的持久化事件数据的列表。

持久化 Actor 的createReceiveRecover方法通过处理EvtSnapshotOffer消息来定义在恢复过程中如何更新状态。持久化 Actor 的createReceive方法是命令处理程序。在本例中,通过生成一个事件来处理命令,该事件随后被持久化和处理。通过使用事件(或事件序列)作为第一个参数和事件处理程序作为第二个参数调用persist来持久化事件。

persist方法异步地持久化事件,并为成功持久化的事件执行事件处理程序。成功的持久化事件在内部作为触发事件处理程序执行的单个消息发送回持久化 Actor。事件处理程序可能会关闭持久性 Actor 状态并对其进行改变。持久化事件的发送者是相应命令的发送者。这允许事件处理程序回复命令的发送者(未显示)。

事件处理程序的主要职责是使用事件数据更改持久性 Actor 状态,并通过发布事件通知其他人成功的状态更改。

当使用persist持久化事件时,可以确保持久化 Actor 不会在persist调用和关联事件处理程序的执行之间接收进一步的命令。这也适用于单个命令上下文中的多个persist调用。传入的消息将被存储,直到持久化完成。

如果事件的持久性失败,将调用onPersistFailure(默认情况下记录错误),并且 Actor 将无条件停止。如果在存储事件之前拒绝了该事件的持久性,例如,由于序列化错误,将调用onPersistRejected(默认情况下记录警告),并且 Actor 将继续执行下一条消息。

运行这个例子最简单的方法是自己下载准备好的「 Akka 持久性示例」和教程。它包含有关如何运行PersistentActorExample的说明。此示例的源代码也可以在「Akka 示例仓库」中找到。

  • 注释:在使用getContext().become()getContext().unbecome()进行正常处理和恢复期间,还可以在不同的命令处理程序之间切换。要使 Actor 在恢复后进入相同的状态,你需要特别注意在createReceiveRecover方法中使用becomeunbecome执行相同的状态转换,就像在命令处理程序中那样。请注意,当使用来自createReceiveRecoverbecome时,在重播事件时,它仍然只使用createReceiveRecover行为。重播完成后,将使用新行为。

标识符

持久性 Actor 必须有一个标识符(identifier),该标识符在不同的 Actor 化身之间不会发生变化。必须使用persistenceId方法定义标识符。

  1. @Override
  2. public String persistenceId() {
  3. return "my-stable-persistence-id";
  4. }
  • 注释persistenceId对于日志中的给定实体(数据库表/键空间)必须是唯一的。当重播持久化到日志的消息时,你将查询具有persistenceId的消息。因此,如果两个不同的实体共享相同的persistenceId,则消息重播行为已损坏。

恢复

默认情况下,通过重放日志消息,在启动和重新启动时自动恢复持久性 Actor。在恢复期间发送给持久性 Actor 的新消息不会干扰重播的消息。在恢复阶段完成后,它们被一个持久性 Actor 存放和接收。

可以同时进行的并发恢复的数量限制为不使系统和后端数据存储过载。当超过限制时,Actor 将等待其他恢复完成。配置方式为:

  1. akka.persistence.max-concurrent-recoveries = 50
  • 注释:假设原始发件人已经很长时间不在,那么使用getSender()访问已重播消息的发件人将始终导致deadLetters引用。如果在将来的恢复过程中确实需要通知某个 Actor,请将其ActorPath显式存储在持久化事件中。

恢复自定义

应用程序还可以通过在AbstractPersistentActorrecovery方法中返回自定义的Recovery对象来定制恢复的执行方式,要跳过加载快照和重播所有事件,可以使用SnapshotSelectionCriteria.none()。如果快照序列化格式以不兼容的方式更改,则此选项非常有用。它通常不应该在事件被删除时使用。

  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.create(SnapshotSelectionCriteria.none());
  4. }

另一种可能的恢复自定义(对调试有用)是在重播上设置上限,使 Actor 仅在“过去”的某个点上重播(而不是重播到其最新状态)。请注意,在这之后,保留新事件是一个坏主意,因为以后的恢复可能会被以前跳过的事件后面的新事件混淆。

  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.create(457L);
  4. }

通过在PersistentActorrecovery方法中返回Recovery.none()可以关闭恢复:

  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.none();
  4. }

恢复状态

通过以下方法,持久性 Actor 可以查询其自己的恢复状态:

  1. public boolean recoveryRunning();
  2. public boolean recoveryFinished();

有时候,在处理发送给持久性 Actor 的任何其他消息之前,当恢复完成时,需要执行额外的初始化。持久性 Actor 将在恢复之后和任何其他收到的消息之前收到一条特殊的RecoveryCompleted消息。

  1. class MyPersistentActor5 extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "my-stable-persistence-id";
  5. }
  6. @Override
  7. public Receive createReceiveRecover() {
  8. return receiveBuilder()
  9. .match(
  10. RecoveryCompleted.class,
  11. r -> {
  12. // perform init after recovery, before any other messages
  13. // ...
  14. })
  15. .match(String.class, this::handleEvent)
  16. .build();
  17. }
  18. @Override
  19. public Receive createReceive() {
  20. return receiveBuilder()
  21. .match(String.class, s -> s.equals("cmd"), s -> persist("evt", this::handleEvent))
  22. .build();
  23. }
  24. private void handleEvent(String event) {
  25. // update state
  26. // ...
  27. }
  28. }

即使日志中没有事件且快照存储为空,或者是具有以前未使用的persistenceId的新持久性 Actor,Actor 也将始终收到RecoveryCompleted消息。

如果从日志中恢复 Actor 的状态时出现问题,则调用onRecoveryFailure(默认情况下记录错误),Actor 将停止。

内部存储

持久性 Actor 有一个私有存储区,用于在恢复期间对传入消息进行内部缓存,或者通过persist\persistAll方法持久化事件。你仍然可以从Stash接口use/inherit。内部存储(internal stash)与正常存储进行合作,通过unstashAll方法并确保消息正确地unstashed到内部存储以维持顺序保证。

你应该小心,不要向持久性 Actor 发送超过它所能跟上的消息,否则隐藏的消息的数量将无限增长。通过在邮箱配置中定义最大存储容量来防止OutOfMemoryError是明智的:

  1. akka.actor.default-mailbox.stash-capacity=10000

注意,其是每个 Actor 的藏匿存储容量(stash capacity)。如果你有许多持久性 Actor,例如在使用集群分片(cluster sharding)时,你可能需要定义一个小的存储容量,以确保系统中存储的消息总数不会消耗太多的内存。此外,持久性 Actor 定义了三种策略来处理超过内部存储容量时的故障。默认的溢出策略是ThrowOverflowExceptionStrategy,它丢弃当前接收到的消息并抛出StashOverflowException,如果使用默认的监视策略,则会导致 Actor 重新启动。你可以重写internalStashOverflowStrategy方法,为任何“单个(individual)”持久性 Actor 返回DiscardToDeadLetterStrategyReplyToStrategy,或者通过提供 FQCN 为所有持久性 Actor 定义“默认值”,FQCN 必须是持久配置中StashOverflowStrategyConfigurator的子类:

  1. akka.persistence.internal-stash-overflow-strategy=
  2. "akka.persistence.ThrowExceptionConfigurator"

DiscardToDeadLetterStrategy策略还具有预打包(pre-packaged)的伴生配置程序akka.persistence.DiscardConfigurator

你还可以通过 Akka 的持久性扩展查询默认策略:

  1. Persistence.get(getContext().getSystem()).defaultInternalStashOverflowStrategy();
  • 注释:在持久性 Actor 中,应避免使用有界的邮箱(bounded mailbox),否则来自存储后端的消息可能会被丢弃。你可以用有界的存储(bounded stash)来代替它。

Relaxed 本地一致性需求和高吞吐量用例

如果面对relaxed本地一致性和高吞吐量要求,有时PersistentActor及其persist在高速使用传入命令方面可能不够,因为它必须等到与给定命令相关的所有事件都被处理后才能开始处理下一个命令。虽然这种抽象在大多数情况下都非常有用,但有时你可能会面临关于一致性的relaxed要求——例如,你可能希望尽可能快地处理命令,假设事件最终将在后台被持久化并正确处理,如果需要,可以对持久性失败进行逆向反应(retroactively reacting)。

persistAsync方法提供了一个工具来实现高吞吐量的持久性 Actor。当日志仍在处理持久化and/or用户代码正在执行事件回调时,它不会存储传入的命令。

在下面的示例中,即使在处理下一个命令之后,事件回调也可以“任何时候”调用。事件之间的顺序仍然是有保证的(evt-b-1将在evt-a-2之后发送,也将在evt-a-1之后发送)。

  1. class MyPersistentActor extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "my-stable-persistence-id";
  5. }
  6. private void handleCommand(String c) {
  7. getSender().tell(c, getSelf());
  8. persistAsync(
  9. String.format("evt-%s-1", c),
  10. e -> {
  11. getSender().tell(e, getSelf());
  12. });
  13. persistAsync(
  14. String.format("evt-%s-2", c),
  15. e -> {
  16. getSender().tell(e, getSelf());
  17. });
  18. }
  19. @Override
  20. public Receive createReceiveRecover() {
  21. return receiveBuilder().match(String.class, this::handleCommand).build();
  22. }
  23. @Override
  24. public Receive createReceive() {
  25. return receiveBuilder().match(String.class, this::handleCommand).build();
  26. }
  27. }
  • 注释:为了实现名为“命令源(command sourcing)”的模式,请立即对所有传入消息调用persistAsync,并在回调中处理它们。
  • 警告:如果在对persistAsync的调用和日志确认写入之间重新启动或停止 Actor,则不会调用回调。

推迟操作,直到执行了前面的持久处理程序

有时候,在处理persistAsyncpersist时,你可能会发现,最好定义一些“在调用以前的persistAsync/persist处理程序之后发生”的操作。PersistentActor提供了名为deferdeferAsync的实用方法,它们分别与persistpersistAsync工作类似,但不会持久化传入事件。建议将它们用于读取操作,在域模型中没有相应事件的操作。

使用这些方法与持久化方法非常相似,但它们不会持久化传入事件。它将保存在内存中,并在调用处理程序时使用。

  1. class MyPersistentActor extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "my-stable-persistence-id";
  5. }
  6. private void handleCommand(String c) {
  7. persistAsync(
  8. String.format("evt-%s-1", c),
  9. e -> {
  10. getSender().tell(e, getSelf());
  11. });
  12. persistAsync(
  13. String.format("evt-%s-2", c),
  14. e -> {
  15. getSender().tell(e, getSelf());
  16. });
  17. deferAsync(
  18. String.format("evt-%s-3", c),
  19. e -> {
  20. getSender().tell(e, getSelf());
  21. });
  22. }
  23. @Override
  24. public Receive createReceiveRecover() {
  25. return receiveBuilder().match(String.class, this::handleCommand).build();
  26. }
  27. @Override
  28. public Receive createReceive() {
  29. return receiveBuilder().match(String.class, this::handleCommand).build();
  30. }
  31. }

请注意,sender()在处理程序回调中是安全的,它将指向调用此deferdeferAsync处理程序的命令的原始发送者。

调用方将按此(保证)顺序得到响应:

  1. final ActorRef persistentActor = system.actorOf(Props.create(MyPersistentActor.class));
  2. persistentActor.tell("a", sender);
  3. persistentActor.tell("b", sender);
  4. // order of received messages:
  5. // a
  6. // b
  7. // evt-a-1
  8. // evt-a-2
  9. // evt-a-3
  10. // evt-b-1
  11. // evt-b-2
  12. // evt-b-3

你也可以在调用persist时,调用defer或者deferAsync

  1. class MyPersistentActor extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "my-stable-persistence-id";
  5. }
  6. private void handleCommand(String c) {
  7. persist(
  8. String.format("evt-%s-1", c),
  9. e -> {
  10. sender().tell(e, self());
  11. });
  12. persist(
  13. String.format("evt-%s-2", c),
  14. e -> {
  15. sender().tell(e, self());
  16. });
  17. defer(
  18. String.format("evt-%s-3", c),
  19. e -> {
  20. sender().tell(e, self());
  21. });
  22. }
  23. @Override
  24. public Receive createReceiveRecover() {
  25. return receiveBuilder().match(String.class, this::handleCommand).build();
  26. }
  27. @Override
  28. public Receive createReceive() {
  29. return receiveBuilder().match(String.class, this::handleCommand).build();
  30. }
  31. }
  • 警告:如果在对deferdeferAsync的调用之间重新启动或停止 Actor,并且日志已经处理并确认了前面的所有写入操作,则不会调用回调。

嵌套的持久调用

可以在各自的回调块中调用persistpersistAsync,它们将正确地保留线程安全性(包括getSender()的正确值)和存储保证。

一般来说,鼓励创建不需要使用嵌套事件持久化的命令处理程序,但是在某些情况下,它可能会有用。了解这些情况下回调执行的顺序以及它们对隐藏行为(persist()强制执行)的影响是很重要的。在下面的示例中,发出了两个持久调用,每个持久调用在其回调中发出另一个持久调用:

  1. @Override
  2. public Receive createReceiveRecover() {
  3. final Procedure<String> replyToSender = event -> getSender().tell(event, getSelf());
  4. return receiveBuilder()
  5. .match(
  6. String.class,
  7. msg -> {
  8. persist(
  9. String.format("%s-outer-1", msg),
  10. event -> {
  11. getSender().tell(event, getSelf());
  12. persist(String.format("%s-inner-1", event), replyToSender);
  13. });
  14. persist(
  15. String.format("%s-outer-2", msg),
  16. event -> {
  17. getSender().tell(event, getSelf());
  18. persist(String.format("%s-inner-2", event), replyToSender);
  19. });
  20. })
  21. .build();
  22. }

向此PersistentActor发送两个命令时,将按以下顺序执行持久化处理程序:

  1. persistentActor.tell("a", ActorRef.noSender());
  2. persistentActor.tell("b", ActorRef.noSender());
  3. // order of received messages:
  4. // a
  5. // a-outer-1
  6. // a-outer-2
  7. // a-inner-1
  8. // a-inner-2
  9. // and only then process "b"
  10. // b
  11. // b-outer-1
  12. // b-outer-2
  13. // b-inner-1
  14. // b-inner-2

首先,发出持久调用的“外层”,并应用它们的回调。成功完成这些操作后,将调用内部回调(一旦日志确认了它们所持续的事件是持久的)。只有在成功地调用了所有这些处理程序之后,才能将下一个命令传递给持久性 Actor。换句话说,通过最初在外层上调用persist()来保证输入命令的存储被扩展,直到所有嵌套的persist回调都被处理完毕。

也可以使用相同的模式嵌套persistAsync调用:

  1. @Override
  2. public Receive createReceive() {
  3. final Procedure<String> replyToSender = event -> getSender().tell(event, getSelf());
  4. return receiveBuilder()
  5. .match(
  6. String.class,
  7. msg -> {
  8. persistAsync(
  9. String.format("%s-outer-1", msg),
  10. event -> {
  11. getSender().tell(event, getSelf());
  12. persistAsync(String.format("%s-inner-1", event), replyToSender);
  13. });
  14. persistAsync(
  15. String.format("%s-outer-2", msg),
  16. event -> {
  17. getSender().tell(event, getSelf());
  18. persistAsync(String.format("%s-inner-1", event), replyToSender);
  19. });
  20. })
  21. .build();
  22. }

在这种情况下,不会发生存储,但事件仍将持续,并且按预期顺序执行回调:

  1. persistentActor.tell("a", getSelf());
  2. persistentActor.tell("b", getSelf());
  3. // order of received messages:
  4. // a
  5. // b
  6. // a-outer-1
  7. // a-outer-2
  8. // b-outer-1
  9. // b-outer-2
  10. // a-inner-1
  11. // a-inner-2
  12. // b-inner-1
  13. // b-inner-2
  14. // which can be seen as the following causal relationship:
  15. // a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
  16. // b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2

尽管可以通过保持各自的语义来嵌套混合persistpersistAsync,但这不是推荐的做法,因为这可能会导致嵌套过于复杂。

  • 警告:虽然可以在彼此内部嵌套persist调用,但从 Actor 消息处理线程以外的任何其他线程调用persist都是非法的。例如,通过Futures调用persist就是非法的!这样做将打破persist方法旨在提供的保证,应该始终从 Actor 的接收块(receive block)中调用persistpersistAsync

失败

如果事件的持久性失败,将调用onPersistFailure(默认情况下记录错误),并且 Actor 将无条件停止。

persist失败时,它无法恢复的原因是不知道事件是否实际持续,因此处于不一致状态。由于日志可能不可用,在持续失败时重新启动很可能会失败。最好是停止 Actor,然后在退后超时后重新启动。提供akka.pattern.BackoffSupervisor Actor 以支持此类重新启动。

  1. @Override
  2. public void preStart() throws Exception {
  3. final Props childProps = Props.create(MyPersistentActor1.class);
  4. final Props props =
  5. BackoffSupervisor.props(
  6. childProps, "myActor", Duration.ofSeconds(3), Duration.ofSeconds(30), 0.2);
  7. getContext().actorOf(props, "mySupervisor");
  8. super.preStart();
  9. }

如果在存储事件之前拒绝了该事件的持久性,例如,由于序列化错误,将调用onPersistRejected(默认情况下记录警告),并且 Actor 将继续执行下一条消息。

如果在启动 Actor 时无法从日志中恢复 Actor 的状态,将调用onRecoveryFailure(默认情况下记录错误),并且 Actor 将被停止。请注意,加载快照失败也会像这样处理,但如果你知道序列化格式已以不兼容的方式更改,则可以禁用快照加载,请参阅「恢复自定义」。

原子写入

每个事件都是原子存储的(stored atomically),但也可以使用persistAllpersistAllAsync方法原子存储多个事件。这意味着传递给该方法的所有事件都将被存储,或者在出现错误时不存储任何事件。

因此,持久性 Actor 的恢复永远不会只在persistAll持久化事件的一个子集的情况下部分完成。

有些日志可能不支持几个事件的原子写入(atomic writes),它们将拒绝persistAll命令,例如调用OnPersistRejected时出现异常(通常是UnsupportedOperationException)。

批量写入

为了在使用persistAsync时优化吞吐量,持久性 Actor 在将事件写入日志(作为单个批处理)之前在内部批处理要在高负载下存储的事件。批(batch)的大小由日志往返期间发出的事件数动态确定:向日志发送批之后,在收到上一批已写入的确认信息之前,不能再发送其他批。批写入从不基于计时器,它将延迟保持在最小值。

消息删除

可以在指定的序列号之前删除所有消息(由单个持久性 Actor 记录);持久性 Actor 可以为此端调用deleteMessages方法。

在基于事件源的应用程序中删除消息通常要么根本不使用,要么与快照一起使用,即在成功存储快照之后,可以发出一条deleteMessages(toSequenceNr)消息。

  • 警告:如果你使用「持久性查询」,查询结果可能会丢失日志中已删除的消息,这取决于日志插件中如何实现删除。除非你使用的插件在持久性查询结果中仍然显示已删除的消息,否则你必须设计应用程序,使其不受丢失消息的影响。

在持久性 Actor 发出deleteMessages消息之后,如果删除成功,则向持久性 Actor 发送DeleteMessagesSuccess消息,如果删除失败,则向持久性 Actor 发送DeleteMessagesFailure消息。

消息删除不会影响日志的最高序列号,即使在调用deleteMessages之后从日志中删除了所有消息。

持久化状态处理

持久化、删除和重放消息可以成功,也可以失败。

MethodSuccess
persist / persistAsync调用持久化处理器
onPersistRejected无自动行为
recoveryRecoveryCompleted
deleteMessagesDeleteMessagesSuccess

最重要的操作(persistrecovery)将故障处理程序建模为显式回调,用户可以在PersistentActor中重写该回调。这些处理程序的默认实现会发出一条日志消息(persistrecovery失败的error),记录失败原因和有关导致失败的消息的信息。

对于严重的故障(如恢复或持久化事件失败),在调用故障处理程序后将停止持久性 Actor。这是因为,如果底层日志实现发出持久性失败的信号,那么它很可能要么完全失败,要么过载并立即重新启动,然后再次尝试持久性事件,这很可能不会帮助日志恢复,因为它可能会导致一个「Thundering herd」问题,因为许多持久性 Actor 会重新启动并尝试继续他们的活动。相反,使用BackoffSupervisor,它实现了一个指数级的退避(exponential-backoff)策略,允许持久性 Actor 在重新启动之间有更多的喘息空间。

  • 注释:日志实现可以选择实现重试机制,例如,只有在写入失败N次之后,才会向用户发出持久化失败的信号。换句话说,一旦一个日志返回一个失败,它就被 Akka 持久化认为是致命的,导致失败的持久行 Actor 将被停止。检查你正在使用的日志实现文档,了解它是否或如何使用此技术。

安全地关闭持久性 Actor

当从外部关闭持久性 Actor 时,应该特别小心。对于正常的 Actor,通常可以接受使用特殊的PoisonPill消息来向 Actor 发出信号,一旦收到此信息,它就应该停止自己。事实上,此消息是由 Akka 自动处理的。

当与PersistentActor一起使用时,这可能很危险。由于传入的命令将从 Actor 的邮箱中排出,并在等待确认时放入其内部存储(在调用持久处理程序之前),因此 Actor 可以在处理已放入其存储的其他消息之前接收和(自动)处理PoisonPill,从而导致 Actor 的提前(pre-mature)停止。

  • 警告:当与持久性 Actor 一起工作时,考虑使用明确的关闭消息而不是使用PoisonPill

下面的示例强调了消息如何到达 Actor 的邮箱,以及在使用persist()时它们如何与其内部存储机制交互。注意,使用PoisonPill时可能发生的早期停止行为:

  1. final class Shutdown {}
  2. class MyPersistentActor extends AbstractPersistentActor {
  3. @Override
  4. public String persistenceId() {
  5. return "some-persistence-id";
  6. }
  7. @Override
  8. public Receive createReceive() {
  9. return receiveBuilder()
  10. .match(
  11. Shutdown.class,
  12. shutdown -> {
  13. getContext().stop(getSelf());
  14. })
  15. .match(
  16. String.class,
  17. msg -> {
  18. System.out.println(msg);
  19. persist("handle-" + msg, e -> System.out.println(e));
  20. })
  21. .build();
  22. }
  23. @Override
  24. public Receive createReceiveRecover() {
  25. return receiveBuilder().matchAny(any -> {}).build();
  26. }
  27. }
  1. // UN-SAFE, due to PersistentActor's command stashing:
  2. persistentActor.tell("a", ActorRef.noSender());
  3. persistentActor.tell("b", ActorRef.noSender());
  4. persistentActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
  5. // order of received messages:
  6. // a
  7. // # b arrives at mailbox, stashing; internal-stash = [b]
  8. // # PoisonPill arrives at mailbox, stashing; internal-stash = [b, Shutdown]
  9. // PoisonPill is an AutoReceivedMessage, is handled automatically
  10. // !! stop !!
  11. // Actor is stopped without handling `b` nor the `a` handler!
  1. // SAFE:
  2. persistentActor.tell("a", ActorRef.noSender());
  3. persistentActor.tell("b", ActorRef.noSender());
  4. persistentActor.tell(new Shutdown(), ActorRef.noSender());
  5. // order of received messages:
  6. // a
  7. // # b arrives at mailbox, stashing; internal-stash = [b]
  8. // # Shutdown arrives at mailbox, stashing; internal-stash = [b, Shutdown]
  9. // handle-a
  10. // # unstashing; internal-stash = [Shutdown]
  11. // b
  12. // handle-b
  13. // # unstashing; internal-stash = []
  14. // Shutdown
  15. // -- stop --

重播滤波器

在某些情况下,事件流可能已损坏,并且多个写入程序(即多个持久性 Actor 实例)使用相同的序列号记录不同的消息。在这种情况下,你可以配置如何在恢复时过滤来自多个编写器(writers)的重播(replayed)消息。

在你的配置中,在akka.persistence.journal.xxx.replay-filter部分(其中xxx是日志插件id)下,你可以从以下值中选择重播过滤器(replay filter)的模式:

  • repair-by-discard-old
  • fail
  • warn
  • off

例如,如果为 LevelDB 插件配置重播过滤器,则如下所示:

  1. # The replay filter can detect a corrupt event stream by inspecting
  2. # sequence numbers and writerUuid when replaying events.
  3. akka.persistence.journal.leveldb.replay-filter {
  4. # What the filter should do when detecting invalid events.
  5. # Supported values:
  6. # `repair-by-discard-old` : discard events from old writers,
  7. # warning is logged
  8. # `fail` : fail the replay, error is logged
  9. # `warn` : log warning but emit events untouched
  10. # `off` : disable this feature completely
  11. mode = repair-by-discard-old
  12. }

快照

当你使用 Actor 建模你的域时,你可能会注意到一些 Actor 可能会积累非常长的事件日志并经历很长的恢复时间。有时,正确的方法可能是分成一组生命周期较短的 Actor。但是,如果这不是一个选项,你可以使用快照(snapshots)来大幅缩短恢复时间。

持久性 Actor 可以通过调用saveSnapshot方法来保存内部状态的快照。如果快照保存成功,持久性 Actor 将收到SaveSnapshotSuccess消息,否则将收到SaveSnapshotFailure消息。

  1. private Object state;
  2. private int snapShotInterval = 1000;
  3. @Override
  4. public Receive createReceive() {
  5. return receiveBuilder()
  6. .match(
  7. SaveSnapshotSuccess.class,
  8. ss -> {
  9. SnapshotMetadata metadata = ss.metadata();
  10. // ...
  11. })
  12. .match(
  13. SaveSnapshotFailure.class,
  14. sf -> {
  15. SnapshotMetadata metadata = sf.metadata();
  16. // ...
  17. })
  18. .match(
  19. String.class,
  20. cmd -> {
  21. persist(
  22. "evt-" + cmd,
  23. e -> {
  24. updateState(e);
  25. if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
  26. saveSnapshot(state);
  27. });
  28. })
  29. .build();
  30. }

其中,metadata的类型为SnapshotMetadata

  1. final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

在恢复过程中,通过SnapshotOffer消息向持久性 Actor 提供以前保存的快照,从中可以初始化内部状态。

  1. private Object state;
  2. @Override
  3. public Receive createReceiveRecover() {
  4. return receiveBuilder()
  5. .match(
  6. SnapshotOffer.class,
  7. s -> {
  8. state = s.snapshot();
  9. // ...
  10. })
  11. .match(
  12. String.class,
  13. s -> {
  14. /* ...*/
  15. })
  16. .build();
  17. }

SnapshotOffer消息之后重播的消息(如果有)比提供的快照状态年轻(younger)。他们最终将持久性 Actor 恢复到当前(即最新)状态。

通常,只有在持久性 Actor 以前保存过一个或多个快照,并且其中至少一个快照与可以指定用于恢复的SnapshotSelectionCriteria匹配时,才会提供持久性 Actor 快照。

  1. @Override
  2. public Recovery recovery() {
  3. return Recovery.create(
  4. SnapshotSelectionCriteria.create(457L, System.currentTimeMillis()));
  5. }

如果未指定,则默认为SnapshotSelectionCriteria.latest(),后者选择最新的(最年轻的)快照。要禁用基于快照的恢复,应用程序应使用SnapshotSelectionCriteria.none()。如果没有保存的快照与指定的SnapshotSelectionCriteria匹配,则恢复将重播所有日志消息。

  • 注释:为了使用快照,必须配置默认的快照存储(akka.persistence.snapshot-store.plugin),或者持久性 Actor 可以通过重写String snapshotPluginId()显式地选择快照存储。由于某些应用程序可以不使用任何快照,因此不配置快照存储是合法的。但是,当检测到这种情况时,Akka 会记录一条警告消息,然后继续操作,直到 Actor 尝试存储快照,此时操作将失败(例如,通过使用SaveSnapshotFailure进行响应)。注意集群分片(Cluster Sharding)的“持久性模式”使用快照。如果使用该模式,则需要定义快照存储插件。

快照删除

持久性 Actor 可以通过使用快照拍摄时间的序列号调用deleteSnapshot方法来删除单个快照。

要批量删除与SnapshotSelectionCriteria匹配的一系列快照,持久性 Actor 应使用deleteSnapshots方法。根据所用的日志,这可能是低效的。最佳做法是使用deleteSnapshot执行特定的删除,或者为SnapshotSelectionCriteria包含minSequenceNrmaxSequenceNr

快照状态处理

保存或删除快照既可以成功,也可以失败,此信息通过状态消息报告给持久性 Actor,如下表所示:

MethodSuccessFailure message
saveSnapshot(Any)SaveSnapshotSuccessSaveSnapshotFailure
deleteSnapshot(Long)DeleteSnapshotSuccessDeleteSnapshotFailure
deleteSnapshots(SnapshotSelectionCriteria)DeleteSnapshotsSuccessDeleteSnapshotsFailure

如果 Actor 未处理故障消息,则将为每个传入的故障消息记录默认的警告日志消息。不会对成功消息执行默认操作,但是你可以自由地处理它们,例如,为了删除快照的内存中表示形式,或者在尝试再次保存快照失败的情况下。

扩容

在一个用例中,如果需要的持久性 Actor 的数量高于一个节点的内存中所能容纳的数量,或者弹性很重要,因此如果一个节点崩溃,那么持久性 Actor 很快就会在一个新节点上启动,并且可以恢复操作,那么「集群分片」非常适合将持久性 Actor 通过他们的id分散到集群和地址上。

Akka 持久化(persistence)是基于单写入(single-writer)原则的。对于特定的persistenceId,一次只能激活一个PersistentActor实例。如果多个实例同时持久化事件,那么这些事件将被交错,并且在重播时可能无法正确解释。集群分片确保数据中心内每个id只有一个活动实体(PersistentActor)。LightBend 的「Multi-DC Persistence」支持跨数据中心的双活(active-active)持久性实体。

在 Akka 之上构建的「Lagom」框架编码了许多与此相关的最佳实践。有关更多详细信息,请参阅 Lagom 文档中的「Managing Data Persistence」和「Persistent Entity」。

至少一次传递

要将具有至少一次传递(at-least-once delivery)语义的消息发送到目标,可以使用AbstractPersistentActorWithAtLeastOnceDelivery,而不是在发送端扩展AbstractPersistentActor。当消息在可配置的超时时间内未被确认时,它负责重新发送消息。

发送 Actor 的状态,包括那些已发送但未被接收者确认的消息,必须是持久的,这样它才能在发送 Actor 或 JVM 崩溃后存活下来。AbstractPersistentActorWithAtLeastOnceDelivery类本身不持久任何内容。

  • 注释:至少有一次传递意味着原始消息发送顺序并不总是保持不变,并且目标可能接收到重复的消息。该语义与普通ActorRef发送操作的语义不匹配:
    • 不是至多一次传递
    • 同一“发送方和接收者”对的消息顺序由于可能的重发而不被保留
    • 在崩溃和目标 Actor 的重新启动之后,消息仍然被传递给新的 Actor 化身。

这些语义类似于ActorPath所表示的含义,因此在传递消息时需要提供路径而不是引用。消息将与 Actor 选择(selection)一起发送到路径。

使用deliver方法将消息发送到目标。当目标已用确认消息答复时,调用confirmDelivery方法。

deliver 与 confirmDelivery 的关系

若要将消息发送到目标路径,请在持久化发送消息的意图之后使用deliver方法。

目标 Actor 必须返回确认消息。当发送 Actor 收到此确认消息时,你应该持久化消息已成功传递的事实,然后调用confirmDelivery方法。

如果持久性 Actor 当前未恢复,则deliver方法将消息发送到目标 Actor。恢复时,将缓冲消息,直到使用confirmDelivery确认消息。一旦恢复完成,如果有未确认的未完成消息(在消息重播期间),持久性 Actor 将在发送任何其他消息之前重新发送这些消息。

传递需要deliveryIdToMessage函数将提供的deliveryId传递到消息中,以便deliverconfirmDelivery之间的关联成为可能。deliveryId必须在传递之间往返。在收到消息后,目标 Actor 会将包装在确认消息中的相同deliveryId发送回发送者。然后,发送方将使用它调用confirmDelivery方法来完成传递过程。

  1. class Msg implements Serializable {
  2. private static final long serialVersionUID = 1L;
  3. public final long deliveryId;
  4. public final String s;
  5. public Msg(long deliveryId, String s) {
  6. this.deliveryId = deliveryId;
  7. this.s = s;
  8. }
  9. }
  10. class Confirm implements Serializable {
  11. private static final long serialVersionUID = 1L;
  12. public final long deliveryId;
  13. public Confirm(long deliveryId) {
  14. this.deliveryId = deliveryId;
  15. }
  16. }
  17. class MsgSent implements Serializable {
  18. private static final long serialVersionUID = 1L;
  19. public final String s;
  20. public MsgSent(String s) {
  21. this.s = s;
  22. }
  23. }
  24. class MsgConfirmed implements Serializable {
  25. private static final long serialVersionUID = 1L;
  26. public final long deliveryId;
  27. public MsgConfirmed(long deliveryId) {
  28. this.deliveryId = deliveryId;
  29. }
  30. }
  31. class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
  32. private final ActorSelection destination;
  33. public MyPersistentActor(ActorSelection destination) {
  34. this.destination = destination;
  35. }
  36. @Override
  37. public String persistenceId() {
  38. return "persistence-id";
  39. }
  40. @Override
  41. public Receive createReceive() {
  42. return receiveBuilder()
  43. .match(
  44. String.class,
  45. s -> {
  46. persist(new MsgSent(s), evt -> updateState(evt));
  47. })
  48. .match(
  49. Confirm.class,
  50. confirm -> {
  51. persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));
  52. })
  53. .build();
  54. }
  55. @Override
  56. public Receive createReceiveRecover() {
  57. return receiveBuilder().match(Object.class, evt -> updateState(evt)).build();
  58. }
  59. void updateState(Object event) {
  60. if (event instanceof MsgSent) {
  61. final MsgSent evt = (MsgSent) event;
  62. deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
  63. } else if (event instanceof MsgConfirmed) {
  64. final MsgConfirmed evt = (MsgConfirmed) event;
  65. confirmDelivery(evt.deliveryId);
  66. }
  67. }
  68. }
  69. class MyDestination extends AbstractActor {
  70. @Override
  71. public Receive createReceive() {
  72. return receiveBuilder()
  73. .match(
  74. Msg.class,
  75. msg -> {
  76. // ...
  77. getSender().tell(new Confirm(msg.deliveryId), getSelf());
  78. })
  79. .build();
  80. }
  81. }

持久化模块生成的deliveryId是严格单调递增的序列号,没有间隙。相同的序列用于 Actor 的所有目的地,即当发送到多个目的地时,目的地将看到序列中的间隙。无法使用自定义deliveryId。但是,你可以将消息中的自定义关联标识符发送到目标。然后必须在内部deliveryId(传递到deliveryIdToMessage函数)和自定义关联id(传递到消息)之间保留映射。你可以通过将此类映射存储在一个Map(correlationId -> deliveryId)中来实现这一点,从该映射中,你可以在消息的接收者用你的自定义关联id答复之后,检索要传递到confirmDelivery方法的deliveryId

AbstractPersistentActorWithAtLeastOnceDelivery类的状态由未确认的消息和序列号组成。它不存储此状态本身。你必须持久化与PersistentActordeliverconfirmDelivery调用相对应的事件,以便在PersistentActor的恢复阶段通过调用相同的方法恢复状态。有时,这些事件可以从其他业务级事件派生,有时必须创建单独的事件。在恢复过程中,deliver调用不会发送消息,如果未执行匹配的confirmDelivery,则稍后将发送这些消息。

对快照的支持由getDeliverySnapshotsetDeliverySnapshot提供。AtLeastOnceDeliverySnapshot包含完整的传递状态,也包括未确认的消息。如你需要 Actor 状态的其他部分的自定义快照,则还必须包括AtLeastOnceDeliverySnapshot。它使用protobuf和普通的 Akka 序列化机制进行序列化。最简单的方法是将AtLeastOnceDeliverySnapshot的字节作为blob包含在自定义快照中。

重新传递尝试之间的间隔由redeliverInterval方法定义。可以使用akka.persistence.at-least-once-delivery.redeliver-interval配置键配置默认值。方法可以被实现类重写以返回非默认值。

在每次重新传递突发时将发送的最大消息数由redeliveryBurstLimit方法定义(突发频率是重新传递间隔的一半)。如果有很多未确认的消息(例如,如果目标 Actor 长时间不可用),这有助于防止同时发送大量的消息。默认值可以使用akka.persistence.at-least-once-delivery.redelivery-burst-limit配置键进行配置。方法可以被实现类重写以返回非默认值。

在多次尝试传递之后,至少会向self发送一条AtLeastOnceDelivery.UnconfirmedWarning消息。重新发送仍将继续,但你可以选择调用confirmDelivery以取消重新发送。发出警告前的传送尝试次数由warnAfterNumberOfUnconfirmedAttempts方法定义。可以使用akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts配置键配置默认值。方法可以被实现类重写以返回非默认值。

AbstractPersistentActorWithAtLeastOnceDelivery类将消息保存在内存中,直到确认它们的成功传递为止。允许 Actor 在内存中保留的未确认消息的最大数目由maxUnconfirmedMessages方法定义。如果超过此限制,则传递方法将不接受更多的消息,并将引发AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException。可以使用akka.persistence.at-least-once-delivery.max-unconfirmed-messages配置键配置默认值。方法可以被实现类重写以返回非默认值。

事件适配器

在使用事件源(event sourcing)的长时间运行的项目中,有时需要将数据模型与域模型完全分离。

事件适配器(Event Adapters)在以下情况中提供帮助:

  • 版本迁移Version Migrations),存储在版本 1 中的现有事件应“向上转换”为新的版本 2 表示,这样做的过程涉及实际代码,而不仅仅是序列化层的更改。对于这些场景,toJournal函数通常是一个标识函数,但是fromJournal实现为v1.Event=>v2.Event,在fromJournal方法中执行必要的映射。这种技术有时在其他 CQRS 库中被称为upcasting
  • 分离域和数据模型Separating Domain and Data models),由于EventAdapters,可以完全分离域模型和用于在日志中持久化数据的模型。例如,你可能希望在域模型中使用case类,但是将它们的协议缓冲区(或任何其他二进制序列化格式)计数器部分保留到日志中。可以使用简单的toJournal:MyModel=>MyDataModelfromJournal:MyDataModel=>MyModel适配器来实现此功能。
  • 日志专用数据类型Journal Specialized Data Types),暴露基础日志所理解的数据类型,例如,对于理解 JSON 的数据存储,可以写一个EventAdaptertoJournal:Any=>JSON,这样日志就可以直接存储 JSON,而不是将对象序列化为其二进制表示。

实现一个EventAdapter非常重要:

  1. class MyEventAdapter implements EventAdapter {
  2. @Override
  3. public String manifest(Object event) {
  4. return ""; // if no manifest needed, return ""
  5. }
  6. @Override
  7. public Object toJournal(Object event) {
  8. return event; // identity
  9. }
  10. @Override
  11. public EventSeq fromJournal(Object event, String manifest) {
  12. return EventSeq.single(event); // identity
  13. }
  14. }

然后,为了在日志中的事件上使用它,必须使用以下配置语法绑定它:

  1. akka.persistence.journal {
  2. inmem {
  3. event-adapters {
  4. tagging = "docs.persistence.MyTaggingEventAdapter"
  5. user-upcasting = "docs.persistence.UserUpcastingEventAdapter"
  6. item-upcasting = "docs.persistence.ItemUpcastingEventAdapter"
  7. }
  8. event-adapter-bindings {
  9. "docs.persistence.Item" = tagging
  10. "docs.persistence.TaggedEvent" = tagging
  11. "docs.persistence.v1.Event" = [user-upcasting, item-upcasting]
  12. }
  13. }
  14. }

可以将多个适配器(adapter)绑定到一个类以进行恢复,在这种情况下,所有绑定适配器的fromJournal方法将应用于给定的匹配事件(按照配置中的定义顺序)。由于每个适配器可以返回从0n个适配事件(称为EventSeq),因此每个适配器都可以调查事件,如果确实需要对其进行适配,则返回相应的事件。在这个过程中没有任何贡献的其他适配器只返回EventSeq.empty。然后,在重放过程中,将调整后的事件传递给PersistentActor

存储插件

日志和快照存储的存储后端可以插入到 Akka 持久性扩展中。

Akka 社区项目页面提供了持久性日志和快照存储插件的目录,请参阅「社区插件」。

插件可以通过“默认”为所有持久性 Actor 的选择,也可以在持久性 Actor 定义自己的插件集时“单独”选择。

当持久性 Actor 不重写journalPluginIdsnapshotPluginId方法时,持久性扩展将使用reference.conf中配置的“默认”日志和快照存储插件:

  1. akka.persistence.journal.plugin = ""
  2. akka.persistence.snapshot-store.plugin = ""

但是,这些条目作为空的""提供,需要通过在用户的application.conf中的覆盖进行显式的用户配置。有关将消息写入 LevelDB 的日志插件的示例,请参阅「Local LevelDB」。有关将快照作为单个文件写入本地文件系统的快照存储插件的示例,请参阅「Local snapshot」。

应用程序可以通过实现插件 API 并通过配置激活插件来提供自己的插件。插件开发需要以下导入:

  1. import akka.dispatch.Futures;
  2. import akka.persistence.*;
  3. import akka.persistence.journal.japi.*;
  4. import akka.persistence.snapshot.japi.*;

持久性插件的预先初始化

默认情况下,持久性插件在使用时按需启动。然而,在某些情况下,预先启动某个插件可能会很有好处。为了做到这一点,你应该首先在akka.extensions键下添加akka.persistence.Persistence。然后,在akka.persistence.journal.auto-start-journalsakka.persistence.snapshot-store.auto-start-snapshot-stores下指定希望自动启动的插件的ID

例如,如果你希望对 LevelDB 日志插件和本地快照存储插件进行预先初始化,那么你的配置应该如下所示:

  1. akka {
  2. extensions = [akka.persistence.Persistence]
  3. persistence {
  4. journal {
  5. plugin = "akka.persistence.journal.leveldb"
  6. auto-start-journals = ["akka.persistence.journal.leveldb"]
  7. }
  8. snapshot-store {
  9. plugin = "akka.persistence.snapshot-store.local"
  10. auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
  11. }
  12. }
  13. }

预打包插件

本地 LevelDB 日志

LevelDB 日志插件配置条目是akka.persistence.journal.leveldb。它将消息写入本地 LevelDB 实例。通过定义配置属性启用此插件:

  1. # Path to the journal plugin to be used
  2. akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"

基于 LevelDB 的插件还需要以下附加依赖声明:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>org.fusesource.leveldbjni</groupId>
  4. <artifactId>leveldbjni-all</artifactId>
  5. <version>1.8</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'org.fusesource.leveldbjni', name: 'leveldbjni-all', version: '1.8'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

LevelDB 文件的默认位置是当前工作目录中名为journal的目录。可以通过配置更改此位置,其中指定的路径可以是相对路径或绝对路径:

  1. akka.persistence.journal.leveldb.dir = "target/journal"

使用这个插件,每个 Actor 系统运行自己的私有 LevelDB 实例。

LevelDB 的一个特点是,删除操作不会从日志中删除消息,而是为每个已删除的消息添加一个“逻辑删除”。在大量使用日志的情况下,尤其是包括频繁删除的情况下,这可能是一个问题,因为用户可能会发现自己正在处理不断增加的日志大小。为此,LevelDB 提供了一个特殊的功能,通过以下配置开启:

  1. # Number of deleted messages per persistence id that will trigger journal compaction
  2. akka.persistence.journal.leveldb.compaction-intervals {
  3. persistence-id-1 = 100
  4. persistence-id-2 = 200
  5. # ...
  6. persistence-id-N = 1000
  7. # use wildcards to match unspecified persistence ids, if any
  8. "*" = 250
  9. }

共享 LevelDB 日记

一个 LevelDB 实例也可以由多个 Actor 系统(在同一个或不同的节点上)共享。例如,这允许持久性 Actor 故障转移到备份节点,并继续从备份节点使用共享日志实例。

  • 警告:共享的 LevelDB 实例是一个单一的故障点,因此只能用于测试目的。
  • 注释:此插件已被「Persistence Plugin Proxy」取代。

通过实例化SharedLeveldbStore Actor 可以启动共享 LevelDB 实例。

  1. final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store");

默认情况下,共享实例将日志消息写入当前工作目录中名为journal的本地目录。存储位置可以通过配置进行更改:

  1. akka.persistence.journal.leveldb-shared.store.dir = "target/shared"

使用共享 LevelDB 存储的 Actor 系统必须激活akka.persistence.journal.leveldb-shared插件。

  1. akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"

必须通过插入(远程)SharedLeveldbStore Actor 引用来初始化此插件。注入是通过使用 Actor 引用作为参数调用SharedLeveldbJournal.setStore方法完成的。

  1. class SharedStorageUsage extends AbstractActor {
  2. @Override
  3. public void preStart() throws Exception {
  4. String path = "akka.tcp://example@127.0.0.1:2552/user/store";
  5. ActorSelection selection = getContext().actorSelection(path);
  6. selection.tell(new Identify(1), getSelf());
  7. }
  8. @Override
  9. public Receive createReceive() {
  10. return receiveBuilder()
  11. .match(
  12. ActorIdentity.class,
  13. ai -> {
  14. if (ai.correlationId().equals(1)) {
  15. Optional<ActorRef> store = ai.getActorRef();
  16. if (store.isPresent()) {
  17. SharedLeveldbJournal.setStore(store.get(), getContext().getSystem());
  18. } else {
  19. throw new RuntimeException("Couldn't identify store");
  20. }
  21. }
  22. })
  23. .build();
  24. }
  25. }

内部日志命令(由持久性 Actor 发送)被缓冲,直到注入完成。注入是幂等的,即只使用第一次注入。

本地快照存储

本地快照存储(local snapshot store)插件配置条目为akka.persistence.snapshot-store.local。它将快照文件写入本地文件系统。通过定义配置属性启用此插件:

  1. # Path to the snapshot store plugin to be used
  2. akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

默认存储位置是当前工作目录中名为snapshots的目录。这可以通过配置进行更改,其中指定的路径可以是相对路径或绝对路径:

  1. akka.persistence.snapshot-store.local.dir = "target/snapshots"

请注意,不必指定快照存储插件。如果不使用快照,则无需对其进行配置。

持久化插件代理

持久化插件代理(persistence plugin proxy)允许跨多个 Actor 系统(在相同或不同节点上)共享日志和快照存储。例如,这允许持久性 Actor 故障转移到备份节点,并继续从备份节点使用共享日志实例。代理的工作方式是将所有日志/快照存储消息转发到一个共享的持久性插件实例,因此支持代理插件支持的任何用例。

  • 警告:共享日志/快照存储是单一故障点,因此应仅用于测试目的。

日志和快照存储代理分别通过akka.persistence.journal.proxyakka.persistence.snapshot-store.proxy配置条目进行控制。将target-journal-plugintarget-snapshot-store-plugin键设置为要使用的基础插件(例如:akka.persistence.journal.leveldb)。在一个 Actor 系统中,start-target-journalstart-target-snapshot-store键应设置为on,这是将实例化共享持久性插件的系统。接下来,需要告诉代理如何找到共享插件。这可以通过设置target-journal-addresstarget-snapshot-store-address配置键来实现,也可以通过编程方式调用PersistencePluginProxy.setTargetLocation方法来实现。

  • 注释:当需要扩展时,Akka 会延迟地启动扩展,这包括代理。这意味着为了让代理正常工作,必须实例化目标节点上的持久性插件。这可以通过实例化PersistencePluginProxyExtension扩展或调用PersistencePluginProxy.start方法来完成。此外,代理持久性插件可以(也应该)使用其原始配置键进行配置。

自定义序列化

快照的序列化和Persistent消息的有效负载可以通过 Akka 的序列化基础设施进行配置。例如,如果应用程序想要序列化

  • payloads of type MyPayload with a custom MyPayloadSerializer and
  • snapshots of type MySnapshot with a custom MySnapshotSerializer

它必须增加:

  1. akka.actor {
  2. serializers {
  3. my-payload = "docs.persistence.MyPayloadSerializer"
  4. my-snapshot = "docs.persistence.MySnapshotSerializer"
  5. }
  6. serialization-bindings {
  7. "docs.persistence.MyPayload" = my-payload
  8. "docs.persistence.MySnapshot" = my-snapshot
  9. }
  10. }

到应用程序配置。如果未指定,则使用默认序列化程序。

有关更高级的模式演化技术,请参阅「Persistence - Schema Evolution」文档。

测试

在 sbt 中使用 LevelDB 默认设置运行测试时,请确保在 sbt 项目中设置fork := true。否则,你将看到一个UnsatisfiedLinkError。或者,你可以通过设置切换到 LevelDB Java 端口。

  1. akka.persistence.journal.leveldb.native = off

  1. akka.persistence.journal.leveldb-shared.store.native = off

在你的 Akka 配置中,LevelDB Java 端口仅用于测试目的。

还要注意的是,对于 LevelDB Java 端口,你将需要以下依赖项:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>org.iq80.leveldb</groupId>
  4. <artifactId>leveldb</artifactId>
  5. <version>0.9</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'org.iq80.leveldb', name: 'leveldb', version: '0.9'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "org.iq80.leveldb" % "leveldb" % "0.9"
  • 警告:由于TestActorRef具有同步性,因此无法使用它来测试持久性提供的类(即PersistentActorAtLeastOnceDelivery)。这些特性需要能够在后台执行异步任务,以便处理与持久性相关的内部事件。当「测试基于持久性的项目」时,总是依赖于使用TestKit的异步消息传递。

配置

持久性模块有几个配置属性,请参阅参考「配置」。

多持久性插件配置

默认情况下,持久性 Actor 将使用在reference.conf配置资源的以下部分中配置的“默认”日志和快照存储插件:

  1. # Absolute path to the default journal plugin configuration entry.
  2. akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
  3. # Absolute path to the default snapshot store plugin configuration entry.
  4. akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

注意,在这种情况下,Actor 只重写persistenceId方法:

  1. abstract class AbstractPersistentActorWithDefaultPlugins extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "123";
  5. }
  6. }

当持久性 Actor 重写journalPluginIdsnapshotPluginId方法时,Actor 将由这些特定的持久性插件而不是默认值提供服务:

  1. abstract class AbstractPersistentActorWithOverridePlugins extends AbstractPersistentActor {
  2. @Override
  3. public String persistenceId() {
  4. return "123";
  5. }
  6. // Absolute path to the journal plugin configuration entry in the `reference.conf`
  7. @Override
  8. public String journalPluginId() {
  9. return "akka.persistence.chronicle.journal";
  10. }
  11. // Absolute path to the snapshot store plugin configuration entry in the `reference.conf`
  12. @Override
  13. public String snapshotPluginId() {
  14. return "akka.persistence.chronicle.snapshot-store";
  15. }
  16. }

请注意,journalPluginIdsnapshotPluginId必须引用正确配置的reference.conf插件条目,这些插件具有标准类属性以及特定于这些插件的设置,即:

  1. # Configuration entry for the custom journal plugin, see `journalPluginId`.
  2. akka.persistence.chronicle.journal {
  3. # Standard persistence extension property: provider FQCN.
  4. class = "akka.persistence.chronicle.ChronicleSyncJournal"
  5. # Custom setting specific for the journal `ChronicleSyncJournal`.
  6. folder = $${user.dir}/store/journal
  7. }
  8. # Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`.
  9. akka.persistence.chronicle.snapshot-store {
  10. # Standard persistence extension property: provider FQCN.
  11. class = "akka.persistence.chronicle.ChronicleSnapshotStore"
  12. # Custom setting specific for the snapshot store `ChronicleSnapshotStore`.
  13. folder = $${user.dir}/store/snapshot
  14. }

在运行时提供持久性插件配置

默认情况下,持久性 Actor 将使用在ActorSystem创建时加载的配置来创建日志和快照存储插件。

当持久性 Actor 重写journalPluginConfigsnapshotPluginConfig方法时,Actor 将使用声明的Config对象,并对默认配置进行回退(fallback)。它允许在运行时动态配置日志和快照存储:

  1. abstract class AbstractPersistentActorWithRuntimePluginConfig extends AbstractPersistentActor
  2. implements RuntimePluginConfig {
  3. // Variable that is retrieved at runtime, from an external service for instance.
  4. String runtimeDistinction = "foo";
  5. @Override
  6. public String persistenceId() {
  7. return "123";
  8. }
  9. // Absolute path to the journal plugin configuration entry in the `reference.conf`
  10. @Override
  11. public String journalPluginId() {
  12. return "journal-plugin-" + runtimeDistinction;
  13. }
  14. // Absolute path to the snapshot store plugin configuration entry in the `reference.conf`
  15. @Override
  16. public String snapshotPluginId() {
  17. return "snapshot-store-plugin-" + runtimeDistinction;
  18. }
  19. // Configuration which contains the journal plugin id defined above
  20. @Override
  21. public Config journalPluginConfig() {
  22. return ConfigFactory.empty()
  23. .withValue(
  24. "journal-plugin-" + runtimeDistinction,
  25. getContext()
  26. .getSystem()
  27. .settings()
  28. .config()
  29. .getValue(
  30. "journal-plugin") // or a very different configuration coming from an external
  31. // service.
  32. );
  33. }
  34. // Configuration which contains the snapshot store plugin id defined above
  35. @Override
  36. public Config snapshotPluginConfig() {
  37. return ConfigFactory.empty()
  38. .withValue(
  39. "snapshot-plugin-" + runtimeDistinction,
  40. getContext()
  41. .getSystem()
  42. .settings()
  43. .config()
  44. .getValue(
  45. "snapshot-store-plugin") // or a very different configuration coming from an
  46. // external service.
  47. );
  48. }
  49. }

更多可见


英文原文链接Persistence.