

为了使用分布式数据(Distributed Data),你需要将以下依赖添加到你的项目中:

  1. <!-- Maven -->
  2. <dependency>
  3. <groupId>com.typesafe.akka</groupId>
  4. <artifactId>akka-distributed-data_2.12</artifactId>
  5. <version>2.5.22</version>
  6. </dependency>
  7. <!-- Gradle -->
  8. dependencies {
  9. compile group: 'com.typesafe.akka', name: 'akka-distributed-data_2.12', version: '2.5.22'
  10. }
  11. <!-- sbt -->
  12. libraryDependencies += "com.typesafe.akka" %% "akka-distributed-data" % "2.5.22"


你可以下载「Distributed Data」示例项目来看看分布式数据是如何在实践中应用的。


当需要在 Akka 集群中的节点之间共享数据时,Akka 分布式数据非常有用。通过提供类似 API 的键值存储的 Actor 访问数据。键是具有数据值类型信息的唯一标识符。这些值是无冲突的复制数据类型(Conflict Free Replicated Data Types (CRDTs))。


自然CRDTs可以在不协调的情况下从任何节点执行更新。来自不同节点的并发更新将由单调合并函数(monotonic merge function)自动解决,所有数据类型都必须提供该函数。状态变化总是收敛的。为计数器、集合、映射和寄存器提供了几种有用的数据类型,你还可以实现自己的自定义数据类型。


使用 Replicator

akka.cluster.ddata.Replicator Actor 提供了与数据交互的 API。Replicator Actor 必须在集群中的每个节点上启动,或者在标记有特定角色的节点组上启动。它与运行在其他节点上的具有相同路径(而不是地址)的其他Replicator实例通信。为了方便起见,它可以与akka.cluster.ddata.DistributedData扩展一起使用,但也可以使用Replicator.props作为普通 Actor 启动。如果它是作为一个普通的 Actor 启动的,那么它必须在所有节点上以相同的名称、相同的路径启动。

状态为「WeaklyUp」的集群成员将参与分布式数据。这意味着数据将通过后台gossip协议复制到WeaklyUp节点。请注意,如果一致性模式是从所有节点或大多数节点读/写,则它不会参与任何操作。WeaklyUp节点不算作集群的一部分。因此,就一致操作而言,3 个节点 + 5 个WeaklyUp的节点本质上是 3 个节点。

下面是一个 Actor 的示例,它将tick消息调度到自己,并为每个tick添加或删除ORSetobserved-remove set)中的元素。它还订阅了这一变化。

  1. import java.time.Duration;
  2. import java.util.concurrent.ThreadLocalRandom;
  3. import akka.actor.AbstractActor;
  4. import akka.actor.ActorRef;
  5. import akka.actor.Cancellable;
  6. import akka.cluster.Cluster;
  7. import akka.cluster.ddata.DistributedData;
  8. import akka.cluster.ddata.Key;
  9. import akka.cluster.ddata.ORSet;
  10. import akka.cluster.ddata.ORSetKey;
  11. import akka.cluster.ddata.Replicator;
  12. import akka.cluster.ddata.Replicator.Changed;
  13. import akka.cluster.ddata.Replicator.Subscribe;
  14. import akka.cluster.ddata.Replicator.Update;
  15. import akka.cluster.ddata.Replicator.UpdateResponse;
  16. import akka.event.Logging;
  17. import akka.event.LoggingAdapter;
  18. public class DataBot extends AbstractActor {
  19. private static final String TICK = "tick";
  20. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  21. private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  22. private final Cluster node = Cluster.get(getContext().getSystem());
  23. private final Cancellable tickTask =
  24. getContext()
  25. .getSystem()
  26. .scheduler()
  27. .schedule(
  28. Duration.ofSeconds(5),
  29. Duration.ofSeconds(5),
  30. getSelf(),
  31. TICK,
  32. getContext().getDispatcher(),
  33. getSelf());
  34. private final Key<ORSet<String>> dataKey = ORSetKey.create("key");
  35. @SuppressWarnings("unchecked")
  36. @Override
  37. public Receive createReceive() {
  38. return receiveBuilder()
  39. .match(String.class, a -> a.equals(TICK), a -> receiveTick())
  40. .match(
  41. Changed.class,
  42. c -> c.key().equals(dataKey),
  43. c -> receiveChanged((Changed<ORSet<String>>) c))
  44. .match(UpdateResponse.class, r -> receiveUpdateResponse())
  45. .build();
  46. }
  47. private void receiveTick() {
  48. String s = String.valueOf((char) ThreadLocalRandom.current().nextInt(97, 123));
  49. if (ThreadLocalRandom.current().nextBoolean()) {
  50. // add
  51. log.info("Adding: {}", s);
  52. Update<ORSet<String>> update =
  53. new Update<>(dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.add(node, s));
  54. replicator.tell(update, getSelf());
  55. } else {
  56. // remove
  57. log.info("Removing: {}", s);
  58. Update<ORSet<String>> update =
  59. new Update<>(
  60. dataKey, ORSet.create(), Replicator.writeLocal(), curr -> curr.remove(node, s));
  61. replicator.tell(update, getSelf());
  62. }
  63. }
  64. private void receiveChanged(Changed<ORSet<String>> c) {
  65. ORSet<String> data = c.dataValue();
  66. log.info("Current elements: {}", data.getElements());
  67. }
  68. private void receiveUpdateResponse() {
  69. // ignore
  70. }
  71. @Override
  72. public void preStart() {
  73. Subscribe<ORSet<String>> subscribe = new Subscribe<>(dataKey, getSelf());
  74. replicator.tell(subscribe, ActorRef.noSender());
  75. }
  76. @Override
  77. public void postStop() {
  78. tickTask.cancel();
  79. }
  80. }




modify函数由Replicator Actor 调用,因此必须是一个纯函数,只使用封闭范围中的数据参数和稳定字段。例如,它必须不访问封闭 Actor 的发送方(getSender())引用。

由于modify函数通常不可序列化,因此只能从与Replicator运行在同一本地ActorSystem中的 Actor 发送Update


  • writeLocal,该值将立即只被写入本地副本,然后通过gossip进行传播。
  • WriteTo(n),该值将立即写入至少n个副本,包括本地副本
  • WriteMajority,该值将立即写入大多数副本,即至少N/2 + 1个副本,其中N是群集(或群集角色组)中的节点数
  • WriteAll,该值将立即写入群集中的所有节点(或群集中角色组中的所有节点)。



  1. class DemonstrateUpdate extends AbstractActor {
  2. final SelfUniqueAddress node =
  3. DistributedData.get(getContext().getSystem()).selfUniqueAddress();
  4. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  5. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  6. final Key<GSet<String>> set1Key = GSetKey.create("set1");
  7. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  8. final Key<Flag> activeFlagKey = FlagKey.create("active");
  9. @Override
  10. public Receive createReceive() {
  11. ReceiveBuilder b = receiveBuilder();
  12. b.matchEquals(
  13. "demonstrate update",
  14. msg -> {
  15. replicator.tell(
  16. new Replicator.Update<PNCounter>(
  17. counter1Key,
  18. PNCounter.create(),
  19. Replicator.writeLocal(),
  20. curr -> curr.increment(node, 1)),
  21. getSelf());
  22. final WriteConsistency writeTo3 = new WriteTo(3, Duration.ofSeconds(1));
  23. replicator.tell(
  24. new Replicator.Update<GSet<String>>(
  25. set1Key, GSet.create(), writeTo3, curr -> curr.add("hello")),
  26. getSelf());
  27. final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
  28. replicator.tell(
  29. new Replicator.Update<ORSet<String>>(
  30. set2Key, ORSet.create(), writeMajority, curr -> curr.add(node, "hello")),
  31. getSelf());
  32. final WriteConsistency writeAll = new WriteAll(Duration.ofSeconds(5));
  33. replicator.tell(
  34. new Replicator.Update<Flag>(
  35. activeFlagKey, Flag.create(), writeAll, curr -> curr.switchOn()),
  36. getSelf());
  37. });
  38. return b.build();
  39. }
  40. }


  1. b.match(
  2. UpdateSuccess.class,
  3. a -> a.key().equals(counter1Key),
  4. a -> {
  5. // ok
  6. });
  1. b.match(
  2. UpdateSuccess.class,
  3. a -> a.key().equals(set1Key),
  4. a -> {
  5. // ok
  6. })
  7. .match(
  8. UpdateTimeout.class,
  9. a -> a.key().equals(set1Key),
  10. a -> {
  11. // write to 3 nodes failed within 1.second
  12. });



  1. class DemonstrateUpdateWithRequestContext extends AbstractActor {
  2. final Cluster node = Cluster.get(getContext().getSystem());
  3. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  4. final WriteConsistency writeTwo = new WriteTo(2, Duration.ofSeconds(3));
  5. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  6. @Override
  7. public Receive createReceive() {
  8. return receiveBuilder()
  9. .match(
  10. String.class,
  11. a -> a.equals("increment"),
  12. a -> {
  13. // incoming command to increase the counter
  14. Optional<Object> reqContext = Optional.of(getSender());
  15. Replicator.Update<PNCounter> upd =
  16. new Replicator.Update<PNCounter>(
  17. counter1Key,
  18. PNCounter.create(),
  19. writeTwo,
  20. reqContext,
  21. curr -> curr.increment(node, 1));
  22. replicator.tell(upd, getSelf());
  23. })
  24. .match(
  25. UpdateSuccess.class,
  26. a -> a.key().equals(counter1Key),
  27. a -> {
  28. ActorRef replyTo = (ActorRef) a.getRequest().get();
  29. replyTo.tell("ack", getSelf());
  30. })
  31. .match(
  32. UpdateTimeout.class,
  33. a -> a.key().equals(counter1Key),
  34. a -> {
  35. ActorRef replyTo = (ActorRef) a.getRequest().get();
  36. replyTo.tell("nack", getSelf());
  37. })
  38. .build();
  39. }
  40. }



  • readLocal,该值将只从本地副本中读取
  • ReadFrom(n),该值将从n个副本(包括本地副本)中读取和合并
  • ReadMajority,该值将从大多数副本(即至少N/2 + 1个副本)中读取和合并,其中N是集群(或集群角色组)中的节点数
  • ReadAll,该值将从群集中的所有节点(或群集角色组中的所有节点)中读取和合并。


  1. class DemonstrateGet extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. final Key<GSet<String>> set1Key = GSetKey.create("set1");
  5. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  6. final Key<Flag> activeFlagKey = FlagKey.create("active");
  7. @Override
  8. public Receive createReceive() {
  9. ReceiveBuilder b = receiveBuilder();
  10. b.matchEquals(
  11. "demonstrate get",
  12. msg -> {
  13. replicator.tell(
  14. new Replicator.Get<PNCounter>(counter1Key, Replicator.readLocal()), getSelf());
  15. final ReadConsistency readFrom3 = new ReadFrom(3, Duration.ofSeconds(1));
  16. replicator.tell(new Replicator.Get<GSet<String>>(set1Key, readFrom3), getSelf());
  17. final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(5));
  18. replicator.tell(new Replicator.Get<ORSet<String>>(set2Key, readMajority), getSelf());
  19. final ReadConsistency readAll = new ReadAll(Duration.ofSeconds(5));
  20. replicator.tell(new Replicator.Get<Flag>(activeFlagKey, readAll), getSelf());
  21. });
  22. return b.build();
  23. }
  24. }


  1. b.match(
  2. GetSuccess.class,
  3. a -> a.key().equals(counter1Key),
  4. a -> {
  5. GetSuccess<PNCounter> g = a;
  6. BigInteger value = g.dataValue().getValue();
  7. })
  8. .match(
  9. NotFound.class,
  10. a -> a.key().equals(counter1Key),
  11. a -> {
  12. // key counter1 does not exist
  13. });
  1. b.match(
  2. GetSuccess.class,
  3. a -> a.key().equals(set1Key),
  4. a -> {
  5. GetSuccess<GSet<String>> g = a;
  6. Set<String> value = g.dataValue().getElements();
  7. })
  8. .match(
  9. GetFailure.class,
  10. a -> a.key().equals(set1Key),
  11. a -> {
  12. // read from 3 nodes failed within 1.second
  13. })
  14. .match(
  15. NotFound.class,
  16. a -> a.key().equals(set1Key),
  17. a -> {
  18. // key set1 does not exist
  19. });



  1. class DemonstrateGetWithRequestContext extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final ReadConsistency readTwo = new ReadFrom(2, Duration.ofSeconds(3));
  4. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .match(
  9. String.class,
  10. a -> a.equals("get-count"),
  11. a -> {
  12. // incoming request to retrieve current value of the counter
  13. Optional<Object> reqContext = Optional.of(getSender());
  14. replicator.tell(new Replicator.Get<PNCounter>(counter1Key, readTwo), getSelf());
  15. })
  16. .match(
  17. GetSuccess.class,
  18. a -> a.key().equals(counter1Key),
  19. a -> {
  20. ActorRef replyTo = (ActorRef) a.getRequest().get();
  21. GetSuccess<PNCounter> g = a;
  22. long value = g.dataValue().getValue().longValue();
  23. replyTo.tell(value, getSelf());
  24. })
  25. .match(
  26. GetFailure.class,
  27. a -> a.key().equals(counter1Key),
  28. a -> {
  29. ActorRef replyTo = (ActorRef) a.getRequest().get();
  30. replyTo.tell(-1L, getSelf());
  31. })
  32. .match(
  33. NotFound.class,
  34. a -> a.key().equals(counter1Key),
  35. a -> {
  36. ActorRef replyTo = (ActorRef) a.getRequest().get();
  37. replyTo.tell(0L, getSelf());
  38. })
  39. .build();
  40. }
  41. }








  1. (nodes_written + nodes_read) > N


例如,在 7 节点集群中,这些一致性属性是通过写入 4 个节点和读取 4 个节点,或写入 5 个节点和读取 3 个节点来实现的。

通过将WriteMajorityReadMajority级别结合起来,读始终反映最新的写入。Replicator对大多数复本进行写入和读取,即N / 2 + 1。例如,在 5 节点集群中,它写入 3 个节点并读取 3 个节点。在 6 节点集群中,它写入 4 个节点并读取 4 个节点。

你可以为WriteMajorityReadMajority定义最小数量的节点,这将最小化读取过时数据的风险。最小capWriteMajorityReadMajorityminCap属性提供,并定义所需的多数。如果minCap高于N / 2 + 1,将使用minCap

例如,如果minCap为 5,3 个节点的集群的WriteMajorityReadMajority将为 3,6 个节点的集群将为 5,12 个节点的集群将为7 ( N / 2 + 1 )



  1. private final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(3));
  2. private static final ReadConsistency readMajority = new ReadMajority(Duration.ofSeconds(3));
  1. private Receive matchGetCart() {
  2. return receiveBuilder()
  3. .matchEquals(GET_CART, s -> receiveGetCart())
  4. .match(
  5. GetSuccess.class,
  6. this::isResponseToGetCart,
  7. g -> receiveGetSuccess((GetSuccess<LWWMap<String, LineItem>>) g))
  8. .match(
  9. NotFound.class,
  10. this::isResponseToGetCart,
  11. n -> receiveNotFound((NotFound<LWWMap<String, LineItem>>) n))
  12. .match(
  13. GetFailure.class,
  14. this::isResponseToGetCart,
  15. f -> receiveGetFailure((GetFailure<LWWMap<String, LineItem>>) f))
  16. .build();
  17. }
  18. private void receiveGetCart() {
  19. Optional<Object> ctx = Optional.of(getSender());
  20. replicator.tell(
  21. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
  22. }
  23. private boolean isResponseToGetCart(GetResponse<?> response) {
  24. return response.key().equals(dataKey)
  25. && (response.getRequest().orElse(null) instanceof ActorRef);
  26. }
  27. private void receiveGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  28. Set<LineItem> items = new HashSet<>(g.dataValue().getEntries().values());
  29. ActorRef replyTo = (ActorRef) g.getRequest().get();
  30. replyTo.tell(new Cart(items), getSelf());
  31. }
  32. private void receiveNotFound(NotFound<LWWMap<String, LineItem>> n) {
  33. ActorRef replyTo = (ActorRef) n.getRequest().get();
  34. replyTo.tell(new Cart(new HashSet<>()), getSelf());
  35. }
  36. private void receiveGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  37. // ReadMajority failure, try again with local read
  38. Optional<Object> ctx = Optional.of(getSender());
  39. replicator.tell(
  40. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, Replicator.readLocal(), ctx),
  41. getSelf());
  42. }
  1. private Receive matchAddItem() {
  2. return receiveBuilder().match(AddItem.class, this::receiveAddItem).build();
  3. }
  4. private void receiveAddItem(AddItem add) {
  5. Update<LWWMap<String, LineItem>> update =
  6. new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> updateCart(cart, add.item));
  7. replicator.tell(update, getSelf());
  8. }



  1. private void receiveRemoveItem(RemoveItem rm) {
  2. // Try to fetch latest from a majority of nodes first, since ORMap
  3. // remove must have seen the item to be able to remove it.
  4. Optional<Object> ctx = Optional.of(rm);
  5. replicator.tell(
  6. new Replicator.Get<LWWMap<String, LineItem>>(dataKey, readMajority, ctx), getSelf());
  7. }
  8. private void receiveRemoveItemGetSuccess(GetSuccess<LWWMap<String, LineItem>> g) {
  9. RemoveItem rm = (RemoveItem) g.getRequest().get();
  10. removeItem(rm.productId);
  11. }
  12. private void receiveRemoveItemGetFailure(GetFailure<LWWMap<String, LineItem>> f) {
  13. // ReadMajority failed, fall back to best effort local value
  14. RemoveItem rm = (RemoveItem) f.getRequest().get();
  15. removeItem(rm.productId);
  16. }
  17. private void removeItem(String productId) {
  18. Update<LWWMap<String, LineItem>> update =
  19. new Update<>(dataKey, LWWMap.create(), writeMajority, cart -> cart.remove(node, productId));
  20. replicator.tell(update, getSelf());
  21. }
  22. private boolean isResponseToRemoveItem(GetResponse<?> response) {
  23. return response.key().equals(dataKey)
  24. && (response.getRequest().orElse(null) instanceof RemoveItem);
  25. }
  • 警告:即使你使用了WriteMajorityReadMajority,但是如果集群成员在UpdateGet之间发生了更改,则也有读取过时数据的小风险。例如,在 5 个节点的集群中,当你Update并将更改写入 3 个节点时:n1n2n3。然后再添加 2 个节点,从 4 个节点读取一个Get请求,正好是n4n5n6n7,也就是说,在Get请求的响应中看不到n1n2n3上的值。




  1. class DemonstrateSubscribe extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. BigInteger currentValue = BigInteger.valueOf(0);
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .match(
  9. Changed.class,
  10. a -> a.key().equals(counter1Key),
  11. a -> {
  12. Changed<PNCounter> g = a;
  13. currentValue = g.dataValue().getValue();
  14. })
  15. .match(
  16. String.class,
  17. a -> a.equals("get-count"),
  18. a -> {
  19. // incoming request to retrieve current value of the counter
  20. getSender().tell(currentValue, getSender());
  21. })
  22. .build();
  23. }
  24. @Override
  25. public void preStart() {
  26. // subscribe to changes of the Counter1Key value
  27. replicator.tell(new Subscribe<PNCounter>(counter1Key, getSelf()), ActorRef.noSender());
  28. }
  29. }





  1. class DemonstrateDelete extends AbstractActor {
  2. final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
  3. final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
  4. final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
  5. @Override
  6. public Receive createReceive() {
  7. return receiveBuilder()
  8. .matchEquals(
  9. "demonstrate delete",
  10. msg -> {
  11. replicator.tell(
  12. new Delete<PNCounter>(counter1Key, Replicator.writeLocal()), getSelf());
  13. final WriteConsistency writeMajority = new WriteMajority(Duration.ofSeconds(5));
  14. replicator.tell(new Delete<PNCounter>(counter1Key, writeMajority), getSelf());
  15. })
  16. .build();
  17. }
  18. }
  • 警告:由于删除的键继续包含在每个节点的存储数据以及gossip消息中,一系列顶级实体的连续更新和删除将导致内存使用量增加,直到ActorSystem耗尽内存。要在需要频繁添加和删除的地方使用 Akka 分布式数据,你应该使用固定数量的支持更新和删除的顶级数据类型,例如ORMapORSet


支持「Delta State Replicated Data Types」。delta-CRDT是一种减少发送更新的完整状态需求的方法。例如,将元素'c''d'添加到集合{'a', 'b'}将导致发送{'c', 'd'},并将其与接收端的状态合并,从而导致集合{'a', 'b', 'c', 'd'}

如果数据类型标记为RequiresCausalDeliveryOfDeltas,则复制deltas的协议支持因果一致性(causal consistency)。否则,它只是最终一致的(eventually consistent)。如果不存在因果一致性,则意味着如果在两个单独的Update操作中添加元素'c''d',这些增量可能偶尔以不同的顺序传播到节点,从而达到更新的因果顺序。在这个例子中,它可以导致集合 {'a', 'b', 'd'}在元素'c'出现之前就可以看到。最终将是{'a', 'b', 'c', 'd'}



  1. akka.cluster.distributed-data.delta-crdt.enabled=off




  • Counters:GCounterPNCounter
  • Sets:GSetORSet
  • Maps:ORMapORMultiMapLWWMapPNCounterMap
  • Registers:LWWRegisterFlag






  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final PNCounter c0 = PNCounter.create();
  3. final PNCounter c1 = c0.increment(node, 1);
  4. final PNCounter c2 = c1.increment(node, 7);
  5. final PNCounter c3 = c2.decrement(node, 2);
  6. System.out.println(c3.value()); // 6



  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final PNCounterMap<String> m0 = PNCounterMap.create();
  3. final PNCounterMap<String> m1 = m0.increment(node, "a", 7);
  4. final PNCounterMap<String> m2 = m1.decrement(node, "a", 2);
  5. final PNCounterMap<String> m3 = m2.increment(node, "b", 1);
  6. System.out.println(m3.get("a")); // 5
  7. System.out.println(m3.getEntries());



  1. final GSet<String> s0 = GSet.create();
  2. final GSet<String> s1 = s0.add("a");
  3. final GSet<String> s2 = s1.add("b").add("c");
  4. if (s2.contains("a")) System.out.println(s2.getElements()); // a, b, c


如果需要添加和删除操作,应使用ORSetobserved-remove set)。元素可以添加和删除任意次数。如果一个元素同时添加和删除,则添加将成功。不能删除未看到的元素。


  1. final Cluster node = Cluster.get(system);
  2. final ORSet<String> s0 = ORSet.create();
  3. final ORSet<String> s1 = s0.add(node, "a");
  4. final ORSet<String> s2 = s1.add(node, "b");
  5. final ORSet<String> s3 = s2.remove(node, "a");
  6. System.out.println(s3.getElements()); // b



ORMapobserved-remove map)是一个具有Any类型的键的映射,值本身就是复制的数据类型。它支持为一个映射条目添加、更新和删除任意次数。




  • ORMultiMapobserved-remove multi-map)是一个多映射实现,它用一个ORSet来包装一个ORMap以获得该映射的值。
  • PNCounterMappositive negative counter map)是命名计数器的映射(其中名称可以是任何类型)。它是具有PNCounter值的特殊ORMap
  • LWWMaplast writer wins map)是一个具有LWWRegisterlast writer wins register)值的特殊ORMap


  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final ORMultiMap<String, Integer> m0 = ORMultiMap.create();
  3. final ORMultiMap<String, Integer> m1 = m0.put(node, "a", new HashSet<>(Arrays.asList(1, 2, 3)));
  4. final ORMultiMap<String, Integer> m2 = m1.addBinding(node, "a", 4);
  5. final ORMultiMap<String, Integer> m3 = m2.removeBinding(node, "a", 2);
  6. final ORMultiMap<String, Integer> m4 = m3.addBinding(node, "b", 1);
  7. System.out.println(m4.getEntries());

更改数据项时,该项的完整状态将复制到其他节点,即更新映射时,将复制整个映射。因此,不使用一个包含 1000 个元素的ORMap,而是更有效地将其分解为 10 个顶级ORMap条目,每个条目包含 100 个元素。顶级条目是单独复制的,这就需要权衡不同条目可能不会同时复制,并且你可能会看到相关条目之间的不一致。单独的顶级条目不能原子地一起更新。

有一个特殊版本的ORMultiMap,是使用单独的构造函数ORMultiMap.emptyWithValueDeltas[A, B]创建的,它还将更新作为delta传播到它的值(ORSet类型)。这意味着用ORMultiMap.emptyWithValueDeltas启动的ORMultiMap将其更新作为成对传播,包含键的delta和值的delta。它在网络带宽消耗方面效率更高。

但是,此行为尚未成为ORMultiMap的默认行为,如果希望在代码中使用它,则需要将ORMultiMap.empty[A, B](或者ORMultiMap())的调用替换为ORMultiMap.emptyWithValueDeltas[A, B],其中AB分别是映射中的键和值的类型。



Flags 和 Registers


  1. final Flag f0 = Flag.create();
  2. final Flag f1 = f0.switchOn();
  3. System.out.println(f1.enabled());

LWWRegisterlast writer wins register)可以保存任何(可序列化)值。



  1. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  2. final LWWRegister<String> r1 = LWWRegister.create(node, "Hello");
  3. final LWWRegister<String> r2 = r1.withValue(node, "Hi");
  4. System.out.println(r1.value() + " by " + r1.updatedBy() + " at " + r1.timestamp());


  1. class Record {
  2. public final int version;
  3. public final String name;
  4. public final String address;
  5. public Record(int version, String name, String address) {
  6. this.version = version;
  7. this.name = name;
  8. this.address = address;
  9. }
  10. }
  11. final SelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();
  12. final LWWRegister.Clock<Record> recordClock =
  13. new LWWRegister.Clock<Record>() {
  14. @Override
  15. public long apply(long currentTimestamp, Record value) {
  16. return value.version;
  17. }
  18. };
  19. final Record record1 = new Record(1, "Alice", "Union Square");
  20. final LWWRegister<Record> r1 = LWWRegister.create(node, record1);
  21. final Record record2 = new Record(2, "Alice", "Madison Square");
  22. final LWWRegister<Record> r2 = LWWRegister.create(node, record2);
  23. final LWWRegister<Record> r3 = r1.merge(r2);
  24. System.out.println(r3.value());


defaultClock使用System.currentTimeMillis()currentTimestamp + 1的最大值。这意味着时间戳对于在相同毫秒内发生的同一节点上的更改会增加。它还意味着,当只有一个活动的writer(如集群单例)时,可以安全地使用不带同步时钟的LWWRegister。这样的单个writer应该首先使用ReadMajority(或更多)读取当前值,然后再使用WriteMajority(或更多)更改和写入值。





  1. public class TwoPhaseSet extends AbstractReplicatedData<TwoPhaseSet> {
  2. public final GSet<String> adds;
  3. public final GSet<String> removals;
  4. public TwoPhaseSet(GSet<String> adds, GSet<String> removals) {
  5. this.adds = adds;
  6. this.removals = removals;
  7. }
  8. public static TwoPhaseSet create() {
  9. return new TwoPhaseSet(GSet.create(), GSet.create());
  10. }
  11. public TwoPhaseSet add(String element) {
  12. return new TwoPhaseSet(adds.add(element), removals);
  13. }
  14. public TwoPhaseSet remove(String element) {
  15. return new TwoPhaseSet(adds, removals.add(element));
  16. }
  17. public Set<String> getElements() {
  18. Set<String> result = new HashSet<>(adds.getElements());
  19. result.removeAll(removals.getElements());
  20. return result;
  21. }
  22. @Override
  23. public TwoPhaseSet mergeData(TwoPhaseSet that) {
  24. return new TwoPhaseSet(this.adds.merge(that.adds), this.removals.merge(that.removals));
  25. }
  26. }




数据类型必须可以使用「Akka Serializer」进行序列化。强烈建议对自定义数据类型使用Protobuf或类似工具实现有效的序列化。内置数据类型用ReplicatedDataSerialization标记,并用akka.cluster.ddata.protobuf.ReplicatedDataSerializer序列化。



  1. option java_package = "docs.ddata.protobuf.msg";
  2. option optimize_for = SPEED;
  3. message TwoPhaseSet {
  4. repeated string adds = 1;
  5. repeated string removals = 2;
  6. }


  1. import jdocs.ddata.TwoPhaseSet;
  2. import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
  3. import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder;
  4. import java.util.ArrayList;
  5. import java.util.Collections;
  6. import akka.actor.ExtendedActorSystem;
  7. import akka.cluster.ddata.GSet;
  8. import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
  9. public class TwoPhaseSetSerializer extends AbstractSerializationSupport {
  10. private final ExtendedActorSystem system;
  11. public TwoPhaseSetSerializer(ExtendedActorSystem system) {
  12. this.system = system;
  13. }
  14. @Override
  15. public ExtendedActorSystem system() {
  16. return this.system;
  17. }
  18. @Override
  19. public boolean includeManifest() {
  20. return false;
  21. }
  22. @Override
  23. public int identifier() {
  24. return 99998;
  25. }
  26. @Override
  27. public byte[] toBinary(Object obj) {
  28. if (obj instanceof TwoPhaseSet) {
  29. return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
  30. } else {
  31. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  32. }
  33. }
  34. @Override
  35. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  36. return twoPhaseSetFromBinary(bytes);
  37. }
  38. protected TwoPhaseSetMessages.TwoPhaseSet twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
  39. Builder b = TwoPhaseSetMessages.TwoPhaseSet.newBuilder();
  40. ArrayList<String> adds = new ArrayList<>(twoPhaseSet.adds.getElements());
  41. if (!adds.isEmpty()) {
  42. Collections.sort(adds);
  43. b.addAllAdds(adds);
  44. }
  45. ArrayList<String> removals = new ArrayList<>(twoPhaseSet.removals.getElements());
  46. if (!removals.isEmpty()) {
  47. Collections.sort(removals);
  48. b.addAllRemovals(removals);
  49. }
  50. return b.build();
  51. }
  52. protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
  53. try {
  54. TwoPhaseSetMessages.TwoPhaseSet msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes);
  55. GSet<String> adds = GSet.create();
  56. for (String elem : msg.getAddsList()) {
  57. adds = adds.add(elem);
  58. }
  59. GSet<String> removals = GSet.create();
  60. for (String elem : msg.getRemovalsList()) {
  61. removals = removals.add(elem);
  62. }
  63. // GSet will accumulate deltas when adding elements,
  64. // but those are not of interest in the result of the deserialization
  65. return new TwoPhaseSet(adds.resetDelta(), removals.resetDelta());
  66. } catch (Exception e) {
  67. throw new RuntimeException(e.getMessage(), e);
  68. }
  69. }
  70. }



  1. akka.actor {
  2. serializers {
  3. twophaseset = "jdocs.ddata.protobuf.TwoPhaseSetSerializer"
  4. }
  5. serialization-bindings {
  6. "jdocs.ddata.TwoPhaseSet" = twophaseset
  7. }
  8. }


  1. @Override
  2. public byte[] toBinary(Object obj) {
  3. if (obj instanceof TwoPhaseSet) {
  4. return compress(twoPhaseSetToProto((TwoPhaseSet) obj));
  5. } else {
  6. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  7. }
  8. }
  9. @Override
  10. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  11. return twoPhaseSetFromBinary(decompress(bytes));
  12. }


  1. message TwoPhaseSet2 {
  2. optional bytes adds = 1;
  3. optional bytes removals = 2;
  4. }

并使用序列化支持特性提供的方法otherMessageToProtootherMessageFromBinary来序列化和反序列化GSet实例。这适用于任何具有已注册的 Akka 序列化程序的类型。下面就是TwoPhaseSet这样的序列化程序:

  1. import jdocs.ddata.TwoPhaseSet;
  2. import docs.ddata.protobuf.msg.TwoPhaseSetMessages;
  3. import docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder;
  4. import akka.actor.ExtendedActorSystem;
  5. import akka.cluster.ddata.GSet;
  6. import akka.cluster.ddata.protobuf.AbstractSerializationSupport;
  7. import akka.cluster.ddata.protobuf.ReplicatedDataSerializer;
  8. public class TwoPhaseSetSerializer2 extends AbstractSerializationSupport {
  9. private final ExtendedActorSystem system;
  10. private final ReplicatedDataSerializer replicatedDataSerializer;
  11. public TwoPhaseSetSerializer2(ExtendedActorSystem system) {
  12. this.system = system;
  13. this.replicatedDataSerializer = new ReplicatedDataSerializer(system);
  14. }
  15. @Override
  16. public ExtendedActorSystem system() {
  17. return this.system;
  18. }
  19. @Override
  20. public boolean includeManifest() {
  21. return false;
  22. }
  23. @Override
  24. public int identifier() {
  25. return 99998;
  26. }
  27. @Override
  28. public byte[] toBinary(Object obj) {
  29. if (obj instanceof TwoPhaseSet) {
  30. return twoPhaseSetToProto((TwoPhaseSet) obj).toByteArray();
  31. } else {
  32. throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass());
  33. }
  34. }
  35. @Override
  36. public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
  37. return twoPhaseSetFromBinary(bytes);
  38. }
  39. protected TwoPhaseSetMessages.TwoPhaseSet2 twoPhaseSetToProto(TwoPhaseSet twoPhaseSet) {
  40. Builder b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder();
  41. if (!twoPhaseSet.adds.isEmpty())
  42. b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString());
  43. if (!twoPhaseSet.removals.isEmpty())
  44. b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString());
  45. return b.build();
  46. }
  47. @SuppressWarnings("unchecked")
  48. protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
  49. try {
  50. TwoPhaseSetMessages.TwoPhaseSet2 msg = TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes);
  51. GSet<String> adds = GSet.create();
  52. if (msg.hasAdds()) adds = (GSet<String>) otherMessageFromBinary(msg.getAdds().toByteArray());
  53. GSet<String> removals = GSet.create();
  54. if (msg.hasRemovals())
  55. adds = (GSet<String>) otherMessageFromBinary(msg.getRemovals().toByteArray());
  56. return new TwoPhaseSet(adds, removals);
  57. } catch (Exception e) {
  58. throw new RuntimeException(e.getMessage(), e);
  59. }
  60. }
  61. }



条目可以配置为持久的,即存储在每个节点的本地磁盘上。下一次启动replicator时,即当 Actor 系统重新启动时,将加载存储的数据。这意味着只要旧集群中的至少一个节点参与到新集群中,数据就可以生存。持久条目的键配置为:

  1. akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]



  1. akka.cluster.distributed-data.durable.keys = ["*"]

LMDB」是默认的存储实现。通过实现akka.cluster.ddata.DurableStore中描述的 Actor 协议并为新实现定义akka.cluster.distributed-data.durable.store-actor-class属性,可以将其替换为另一个实现。


  1. # Directory of LMDB file. There are two options:
  2. # 1. A relative or absolute path to a directory that ends with 'ddata'
  3. # the full name of the directory will contain name of the ActorSystem
  4. # and its remote port.
  5. # 2. Otherwise the path is used as is, as a relative or absolute path to
  6. # a directory.
  7. akka.cluster.distributed-data.durable.lmdb.dir = "ddata"

在生产环境中运行时,你可能希望将目录配置为特定路径(alt 2),因为默认目录包含 Actor 系统的远程端口以使名称唯一。如果使用动态分配的端口(0),则每次都会不同,并且不会加载以前存储的数据。

使数据持久化有性能成本。默认情况下,在发送UpdateSuccess回复之前,每个更新都会刷新到磁盘。为了获得更好的性能,但是如果 JVM 崩溃,则有可能丢失最后一次写入,你可以启用写后模式(write behind mode)。然后在将更改写入 LMDB 并刷新到磁盘之前的一段时间内累积更改。当对同一个键执行多次写入时,启用写后处理特别有效,因为它只是将被序列化和存储的每个键的最后一个值。如果 JVM 崩溃的风险很小,则会丢失写操作,因为数据通常会根据给定的WriteConsistency立即复制到其他节点。

  1. akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms


当为持久数据修剪 CRDT 垃圾时,有一个重要的警告。如果一个从未修剪过的旧数据条目被注入,并在修剪(pruning)标记被删除后与现有数据合并,则该值将不正确。标记的生存时间由配置akka.cluster.distributed-data.durable.remove-pruning-marker-after定义,以天为单位。如果具有持久数据的节点没有参与修剪(例如,它被关闭),并且在这段时间之后开始修剪,这是可能的。具有持久数据的节点的停止时间不应超过此持续时间,如果在此持续时间之后再次加入,则应首先手动(从lmdb目录中)删除其数据。


CRDT的一个问题是,某些数据类型会累积历史记录(垃圾)。例如,GCounter跟踪每个节点的一个计数器。如果已经从一个节点更新了GCounter,它将永远关联该节点的标识符。对于添加和删除了许多群集节点的长时间运行的系统来说,这可能成为一个问题。为了解决这个问题,Replicator执行与已从集群中删除的节点相关联的数据修剪。需要修剪的数据类型必须实现RemovedNodePruning特性。有关详细信息,请参阅Replicator的 API 文档。


在「Akka Distributed Data Samples with Java」中包含一些有趣的样例和教程。

  • 低延迟投票服务
  • 高可用购物车
  • 分布式服务注册表
  • 复制缓存
  • 复制指标




它不适用于大数据。顶级条目数不应超过 100000 条。当一个新节点添加到集群中时,所有这些条目都会被传输(gossiped)到新节点。条目被分割成块,所有现有节点在gossip中协作,但传输所有条目需要一段时间(数十秒),这意味着顶级条目不能太多。当前建议的限制为 100000。如果需要的话,我们将能够改进这一点,但是设计仍然不打算用于数十亿个条目。



了解有关 CRDT 的更多信息



  1. # Settings for the DistributedData extension
  2. akka.cluster.distributed-data {
  3. # Actor name of the Replicator actor, /system/ddataReplicator
  4. name = ddataReplicator
  5. # Replicas are running on members tagged with this role.
  6. # All members are used if undefined or empty.
  7. role = ""
  8. # How often the Replicator should send out gossip information
  9. gossip-interval = 2 s
  10. # How often the subscribers will be notified of changes, if any
  11. notify-subscribers-interval = 500 ms
  12. # Maximum number of entries to transfer in one gossip message when synchronizing
  13. # the replicas. Next chunk will be transferred in next round of gossip.
  14. max-delta-elements = 1000
  15. # The id of the dispatcher to use for Replicator actors. If not specified
  16. # default dispatcher is used.
  17. # If specified you need to define the settings of the actual dispatcher.
  18. use-dispatcher = ""
  19. # How often the Replicator checks for pruning of data associated with
  20. # removed cluster nodes. If this is set to 'off' the pruning feature will
  21. # be completely disabled.
  22. pruning-interval = 120 s
  23. # How long time it takes to spread the data to all other replica nodes.
  24. # This is used when initiating and completing the pruning process of data associated
  25. # with removed cluster nodes. The time measurement is stopped when any replica is
  26. # unreachable, but it's still recommended to configure this with certain margin.
  27. # It should be in the magnitude of minutes even though typical dissemination time
  28. # is shorter (grows logarithmic with number of nodes). There is no advantage of
  29. # setting this too low. Setting it to large value will delay the pruning process.
  30. max-pruning-dissemination = 300 s
  31. # The markers of that pruning has been performed for a removed node are kept for this
  32. # time and thereafter removed. If and old data entry that was never pruned is somehow
  33. # injected and merged with existing data after this time the value will not be correct.
  34. # This would be possible (although unlikely) in the case of a long network partition.
  35. # It should be in the magnitude of hours. For durable data it is configured by
  36. # 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
  37. pruning-marker-time-to-live = 6 h
  38. # Serialized Write and Read messages are cached when they are sent to
  39. # several nodes. If no further activity they are removed from the cache
  40. # after this duration.
  41. serializer-cache-time-to-live = 10s
  42. # Settings for delta-CRDT
  43. delta-crdt {
  44. # enable or disable delta-CRDT replication
  45. enabled = on
  46. # Some complex deltas grow in size for each update and above this
  47. # threshold such deltas are discarded and sent as full state instead.
  48. # This is number of elements or similar size hint, not size in bytes.
  49. max-delta-size = 200
  50. }
  51. durable {
  52. # List of keys that are durable. Prefix matching is supported by using * at the
  53. # end of a key.
  54. keys = []
  55. # The markers of that pruning has been performed for a removed node are kept for this
  56. # time and thereafter removed. If and old data entry that was never pruned is
  57. # injected and merged with existing data after this time the value will not be correct.
  58. # This would be possible if replica with durable data didn't participate in the pruning
  59. # (e.g. it was shutdown) and later started after this time. A durable replica should not
  60. # be stopped for longer time than this duration and if it is joining again after this
  61. # duration its data should first be manually removed (from the lmdb directory).
  62. # It should be in the magnitude of days. Note that there is a corresponding setting
  63. # for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
  64. pruning-marker-time-to-live = 10 d
  65. # Fully qualified class name of the durable store actor. It must be a subclass
  66. # of akka.actor.Actor and handle the protocol defined in
  67. # akka.cluster.ddata.DurableStore. The class must have a constructor with
  68. # com.typesafe.config.Config parameter.
  69. store-actor-class = akka.cluster.ddata.LmdbDurableStore
  70. use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
  71. pinned-store {
  72. executor = thread-pool-executor
  73. type = PinnedDispatcher
  74. }
  75. # Config for the LmdbDurableStore
  76. lmdb {
  77. # Directory of LMDB file. There are two options:
  78. # 1. A relative or absolute path to a directory that ends with 'ddata'
  79. # the full name of the directory will contain name of the ActorSystem
  80. # and its remote port.
  81. # 2. Otherwise the path is used as is, as a relative or absolute path to
  82. # a directory.
  83. #
  84. # When running in production you may want to configure this to a specific
  85. # path (alt 2), since the default directory contains the remote port of the
  86. # actor system to make the name unique. If using a dynamically assigned
  87. # port (0) it will be different each time and the previously stored data
  88. # will not be loaded.
  89. dir = "ddata"
  90. # Size in bytes of the memory mapped file.
  91. map-size = 100 MiB
  92. # Accumulate changes before storing improves performance with the
  93. # risk of losing the last writes if the JVM crashes.
  94. # The interval is by default set to 'off' to write each update immediately.
  95. # Enabling write behind by specifying a duration, e.g. 200ms, is especially
  96. # efficient when performing many writes to the same key, because it is only
  97. # the last value for each key that will be serialized and stored.
  98. # write-behind-interval = 200 ms
  99. write-behind-interval = off
  100. }
  101. }
  102. }

