Akka扩展

注:本节未经校验,如有问题欢迎提issue

如果想要为Akka添加特性,有一个非常优美而且强大的工具,称为 Akka 扩展。它由两部分组成: ExtensionExtensionId.

Extensions 在每个 ActorSystem 中只会加载一次, 并被Akka所管理。 你可以选择按需加载你的Extension或是在 ActorSystem 创建时通过Akka配置来加载。 关于这些细节,见下文 “从配置中加载” 的部分.

警告

由于扩展是hook到Akka自身的,所以扩展的实现者需要保证自己扩展的线程安全性。

构建一个扩展

现在我们来创建一个扩展示例,它的功能是对某件事发生的次数进行统计。

首先定义 Extension 的功能:

  1. import akka.actor.Extension
  2. class CountExtensionImpl extends Extension {
  3. //Since this Extension is a shared instance
  4. // per ActorSystem we need to be threadsafe
  5. private val counter = new AtomicLong(0)
  6. //This is the operation this Extension provides
  7. def increment() = counter.incrementAndGet()
  8. }

然后需要为扩展指定一个 ExtensionId,这样我们可以获取它的实例.

  1. import akka.actor.ActorSystem
  2. import akka.actor.ExtensionId
  3. import akka.actor.ExtensionIdProvider
  4. import akka.actor.ExtendedActorSystem
  5. object CountExtension
  6. extends ExtensionId[CountExtensionImpl]
  7. with ExtensionIdProvider {
  8. //The lookup method is required by ExtensionIdProvider,
  9. // so we return ourselves here, this allows us
  10. // to configure our extension to be loaded when
  11. // the ActorSystem starts up
  12. override def lookup = CountExtension
  13. //This method will be called by Akka
  14. // to instantiate our Extension
  15. override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl
  16. /**
  17. * Java API: retrieve the Count extension for the given system.
  18. */
  19. override def get(system: ActorSystem): CountExtensionImpl = super.get(system)
  20. }

好了!然后我们就可以使用它了:

  1. CountExtension(system).increment

或者在Akka Actor中使用:

  1. class MyActor extends Actor {
  2. def receive = {
  3. case someMessage =>
  4. CountExtension(context.system).increment()
  5. }
  6. }

你也可以将扩展藏在 trait 里:

  1. trait Counting { self: Actor =>
  2. def increment() = CountExtension(context.system).increment()
  3. }
  4. class MyCounterActor extends Actor with Counting {
  5. def receive = {
  6. case someMessage => increment()
  7. }
  8. }

这样就搞定了!

从配置中加载

为了能够从Akka配置中加载扩展,你必须在为ActorSystem提供的配置文件中的 akka.extensions 部分加上 ExtensionIdExtensionIdProvider实现类的完整路径。

  1. akka {
  2. extensions = ["docs.extension.CountExtension"]
  3. }

实用性

充分发挥你的想象力,天空才是极限! 顺便提一下,你知道 Akka的Typed Actor, Serialization和其它一些特性都是以Akka扩展的形式实现的吗?

应用特定设置

可以用 Configuration 来指定应用特有的设置。将这些设置放在一个扩展里是一个好习惯。

配置示例:

  1. myapp {
  2. db {
  3. uri = "mongodb://example1.com:27017,example2.com:27017"
  4. }
  5. circuit-breaker {
  6. timeout = 30 seconds
  7. }
  8. }

Extension的实现:

  1. import akka.actor.ActorSystem
  2. import akka.actor.Extension
  3. import akka.actor.ExtensionId
  4. import akka.actor.ExtensionIdProvider
  5. import akka.actor.ExtendedActorSystem
  6. import scala.concurrent.duration.Duration
  7. import com.typesafe.config.Config
  8. import java.util.concurrent.TimeUnit
  9. class SettingsImpl(config: Config) extends Extension {
  10. val DbUri: String = config.getString("myapp.db.uri")
  11. val CircuitBreakerTimeout: Duration =
  12. Duration(config.getMilliseconds("myapp.circuit-breaker.timeout"),
  13. TimeUnit.MILLISECONDS)
  14. }
  15. object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {
  16. override def lookup = Settings
  17. override def createExtension(system: ExtendedActorSystem) =
  18. new SettingsImpl(system.settings.config)
  19. /**
  20. * Java API: retrieve the Settings extension for the given system.
  21. */
  22. override def get(system: ActorSystem): SettingsImpl = super.get(system)
  23. }

使用它:

  1. class MyActor extends Actor {
  2. val settings = Settings(context.system)
  3. val connection = connect(settings.DbUri, settings.CircuitBreakerTimeout)