Akka 扩展

如果你想向 Akka 添加特性,有一个非常优雅但功能强大的机制来实现这一点。它被称为 Akka 扩展(Extensions),由两个基本组件组成:ExtensionExtensionId

每个ActorSystem只加载一次扩展,由 Akka 管理。你可以选择按需加载扩展,也可以通过 Akka 配置在ActorSystem创建时加载扩展。有关如何实现这一点的详细信息,请参阅下面的“从配置中加载”部分。

  • 警告:由于扩展是一种钩住 Akka 本身的方法,因此扩展的实现者需要确保其扩展的线程安全。

建立扩展

现在,让我们创建一个示例扩展,用于计算某些事情的发生次数。

首先,我们定义Extension应该做什么:

  1. import akka.actor.*;
  2. import java.util.concurrent.atomic.AtomicLong;
  3. static class CountExtensionImpl implements Extension {
  4. // Since this Extension is a shared instance
  5. // per ActorSystem we need to be threadsafe
  6. private final AtomicLong counter = new AtomicLong(0);
  7. // This is the operation this Extension provides
  8. public long increment() {
  9. return counter.incrementAndGet();
  10. }
  11. }

然后,我们需要为扩展创建一个ExtensionId,这样我们就可以获取它。

  1. import akka.actor.*;
  2. import java.util.concurrent.atomic.AtomicLong;
  3. static class CountExtension extends AbstractExtensionId<CountExtensionImpl>
  4. implements ExtensionIdProvider {
  5. // This will be the identifier of our CountExtension
  6. public static final CountExtension CountExtensionProvider = new CountExtension();
  7. private CountExtension() {}
  8. // The lookup method is required by ExtensionIdProvider,
  9. // so we return ourselves here, this allows us
  10. // to configure our extension to be loaded when
  11. // the ActorSystem starts up
  12. public CountExtension lookup() {
  13. return CountExtension.CountExtensionProvider; // The public static final
  14. }
  15. // This method will be called by Akka
  16. // to instantiate our Extension
  17. public CountExtensionImpl createExtension(ExtendedActorSystem system) {
  18. return new CountExtensionImpl();
  19. }
  20. }

现在我们需要做的就是实际使用它:

  1. // typically you would use static import of the
  2. // CountExtension.CountExtensionProvider field
  3. CountExtension.CountExtensionProvider.get(system).increment();

或者从 Akka Actor 的内部:

  1. static class MyActor extends AbstractActor {
  2. @Override
  3. public Receive createReceive() {
  4. return receiveBuilder()
  5. .matchAny(
  6. msg -> {
  7. // typically you would use static import of the
  8. // CountExtension.CountExtensionProvider field
  9. CountExtension.CountExtensionProvider.get(getContext().getSystem()).increment();
  10. })
  11. .build();
  12. }
  13. }

就这些了!

从配置中加载

为了能够从 Akka 的配置中加载扩展,必须在提供给ActorSystem的配置的akka.extensions部分中添加ExtensionIdExtensionIdProvider实现的 FQCN。

  1. akka {
  2. extensions = ["docs.extension.ExtensionDocTest.CountExtension"]
  3. }

适用性

一切都是可能的!顺便问一下,你知道 Akka Typed Actor、序列化和其他特性是作为 Akka 扩展实现的吗?

应用程序特定设置

该「配置」可用于特定于应用程序的设置。一个好的实践是将这些设置放在扩展中。

配置示例:

  1. myapp {
  2. db {
  3. uri = "mongodb://example1.com:27017,example2.com:27017"
  4. }
  5. circuit-breaker {
  6. timeout = 30 seconds
  7. }
  8. }

Extension示例:

  1. import akka.actor.Extension;
  2. import akka.actor.AbstractExtensionId;
  3. import akka.actor.ExtensionIdProvider;
  4. import akka.actor.ActorSystem;
  5. import akka.actor.ExtendedActorSystem;
  6. import com.typesafe.config.Config;
  7. import java.util.concurrent.TimeUnit;
  8. import java.time.Duration;
  9. static class SettingsImpl implements Extension {
  10. public final String DB_URI;
  11. public final Duration CIRCUIT_BREAKER_TIMEOUT;
  12. public SettingsImpl(Config config) {
  13. DB_URI = config.getString("myapp.db.uri");
  14. CIRCUIT_BREAKER_TIMEOUT =
  15. Duration.ofMillis(
  16. config.getDuration("myapp.circuit-breaker.timeout", TimeUnit.MILLISECONDS));
  17. }
  18. }
  19. static class Settings extends AbstractExtensionId<SettingsImpl> implements ExtensionIdProvider {
  20. public static final Settings SettingsProvider = new Settings();
  21. private Settings() {}
  22. public Settings lookup() {
  23. return Settings.SettingsProvider;
  24. }
  25. public SettingsImpl createExtension(ExtendedActorSystem system) {
  26. return new SettingsImpl(system.settings().config());
  27. }
  28. }

使用它:

  1. static class MyActor extends AbstractActor {
  2. // typically you would use static import of the Settings.SettingsProvider field
  3. final SettingsImpl settings = Settings.SettingsProvider.get(getContext().getSystem());
  4. Connection connection = connect(settings.DB_URI, settings.CIRCUIT_BREAKER_TIMEOUT);
  5. }

库扩展

第三方库可以通过将其附加到其reference.conf中的akka.library-extensions来注册它的扩展,以便在 Actor 系统启动时自动加载。

  1. akka.library-extensions += "docs.extension.ExampleExtension"

由于无法有选择地删除此类扩展,因此应小心使用,并且仅在用户不希望禁用此类扩展或不支持禁用此类子功能的情况下使用。这一点很重要的一个例子是在测试中。

  • 警告:永远不要分配akka.library-extensions= ["Extension"],因为这样会破坏库扩展机制并使行为依赖于类路径顺序。

英文原文链接Akka Extensions.