如何:在 Dapr 中启用和使用 Actor可重入性

进一步了解 Actor可重入性

虚拟actor模式的一个核心原则是actor执行的单线程特性。 在没有重入的情况下,Dapr 运行时会锁定所有 actor 请求。 在第一个请求完成之前,第二个请求无法启动。 这意味着一个 actor 不能调用它自己,也不能让另一个 actor 调用它,即使它是同一个调用链的一部分。

重入功能允许来自同一链或上下文的请求重新进入已锁定的 actor ,从而解决了这一问题。 这在以下情况下非常有用:

  • Actor 想调用自己的方法
  • Actors 在工作流程中用于执行工作,然后回调到协调 Actors。

可重入性的调用链示例如下:

  1. Actor A -> Actor A
  2. ActorA -> Actor B -> Actor A

有了重入功能,你就可以执行更复杂的 actor 调用,而无需牺牲虚拟 actor 的单线程行为。

Diagram showing reentrancy for a coordinator workflow actor calling worker actors or an actor calling an method on itself

maxStackDepth 参数设置一个值,该值控制对同一参与者进行多少次可重入调用。 默认情况下,此值设置为32,这在大多数情况下已足够。

配置 actor 运行时以启用重入功能

可重入 actor 必须提供适当的配置。 与其他 actor 配置元素类似,这是由 actor 端点 GET /dapr/config 完成的。

  1. public class Startup
  2. {
  3. public void ConfigureServices(IServiceCollection services)
  4. {
  5. services.AddSingleton<BankService>();
  6. services.AddActors(options =>
  7. {
  8. options.Actors.RegisterActor<DemoActor>();
  9. options.ReentrancyConfig = new Dapr.Actors.ActorReentrancyConfig()
  10. {
  11. Enabled = true,
  12. MaxStackDepth = 32,
  13. };
  14. });
  15. }
  16. }
  1. import { CommunicationProtocolEnum, DaprClient, DaprServer } from "@dapr/dapr";
  2. // Configure the actor runtime with the DaprClientOptions.
  3. const clientOptions = {
  4. actor: {
  5. reentrancy: {
  6. enabled: true,
  7. maxStackDepth: 32,
  8. },
  9. },
  10. };
  1. from fastapi import FastAPI
  2. from dapr.ext.fastapi import DaprActor
  3. from dapr.actor.runtime.config import ActorRuntimeConfig, ActorReentrancyConfig
  4. from dapr.actor.runtime.runtime import ActorRuntime
  5. from demo_actor import DemoActor
  6. reentrancyConfig = ActorReentrancyConfig(enabled=True)
  7. config = ActorRuntimeConfig(reentrancy=reentrancyConfig)
  8. ActorRuntime.set_actor_config(config)
  9. app = FastAPI(title=f'{DemoActor.__name__}Service')
  10. actor = DaprActor(app)
  11. @app.on_event("startup")
  12. async def startup_event():
  13. # Register DemoActor
  14. await actor.register_actor(DemoActor)
  15. @app.get("/MakeExampleReentrantCall")
  16. def do_something_reentrant():
  17. # invoke another actor here, reentrancy will be handled automatically
  18. return

这是一个用 Golang 编写的 actor 配置代码片段,通过 HTTP API 提供重入配置。 可重入性尚未包含在 Go SDK 中。

  1. type daprConfig struct {
  2. Entities []string `json:"entities,omitempty"`
  3. ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"`
  4. ActorScanInterval string `json:"actorScanInterval,omitempty"`
  5. DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"`
  6. DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"`
  7. Reentrancy config.ReentrancyConfig `json:"reentrancy,omitempty"`
  8. }
  9. var daprConfigResponse = daprConfig{
  10. []string{defaultActorType},
  11. actorIdleTimeout,
  12. actorScanInterval,
  13. drainOngoingCallTimeout,
  14. drainRebalancedActors,
  15. config.ReentrancyConfig{Enabled: true, MaxStackDepth: &maxStackDepth},
  16. }
  17. func configHandler(w http.ResponseWriter, r *http.Request) {
  18. w.Header().Set("Content-Type", "application/json")
  19. w.WriteHeader(http.StatusOK)
  20. json.NewEncoder(w).Encode(daprConfigResponse)
  21. }

处理重入请求

重入请求的关键是Dapr-Reentrancy-Id请求头。 该标头的值用于将请求与其调用链进行匹配,并允许它们绕过 actor 锁。

Dapr 运行时会为任何指定了重入配置的 actor 请求生成该标头。 一旦生成,它就会被用来锁定 actor ,并且必须传递给以后的所有请求。 下面是一个代理处理重入请求的示例:

  1. func reentrantCallHandler(w http.ResponseWriter, r *http.Request) {
  2. /*
  3. * Omitted.
  4. */
  5. req, _ := http.NewRequest("PUT", url, bytes.NewReader(nextBody))
  6. reentrancyID := r.Header.Get("Dapr-Reentrancy-Id")
  7. req.Header.Add("Dapr-Reentrancy-Id", reentrancyID)
  8. client := http.Client{}
  9. resp, err := client.Do(req)
  10. /*
  11. * Omitted.
  12. */
  13. }

例子

观看这个视频,了解如何使用Actor可重入性。

下一步

Actors in the Dapr SDKs

相关链接