指南:如何编写工作流

学习如何开发和编写工作流

注意

Dapr工作流目前处于beta阶段。 查看已知限制 1.14.1.

本文简要概述了如何创作由 Dapr 工作流引擎执行的工作流。

注意

如果你还没有,请尝试使用工作流快速入门快速了解如何使用工作流。

将工作流作为代码编写

Dapr 工作流逻辑使用通用编程语言实现,使您可以:

  • 使用你喜欢的编程语言(无需学习新的 DSL 或 YAML 模式)。
  • 可以访问该语言的标准库。
  • 构建您自己的库和抽象。
  • 使用调试器并检查局部变量。
  • 为工作流编写单元测试,就像应用程序逻辑的其他部分一样。

Dapr sidecar 不加载任何工作流定义。 相反,Sidecar 只是驱动工作流的执行,让所有工作流活动成为应用程序的一部分。

编写工作流活动

工作流活动是工作流中的基本工作单元,是在业务流程中编排的任务。

定义您希望工作流执行的工作流活动。 活动是一个函数定义,可以接受输入和输出。 下面的示例创建了一个计数器(活动)称为 hello_act,通知用户当前的计数器值。 hello_act 是一个从名为 WorkflowActivityContext 的类派生出来的函数。

  1. def hello_act(ctx: WorkflowActivityContext, input):
  2. global counter
  3. counter += input
  4. print(f'New counter value is: {counter}!', flush=True)

查看上下文中的hello_act工作流活动。

定义您希望工作流执行的工作流活动。 活动被包装在实现工作流活动的WorkflowActivityContext类中。

  1. export default class WorkflowActivityContext {
  2. private readonly _innerContext: ActivityContext;
  3. constructor(innerContext: ActivityContext) {
  4. if (!innerContext) {
  5. throw new Error("ActivityContext cannot be undefined");
  6. }
  7. this._innerContext = innerContext;
  8. }
  9. public getWorkflowInstanceId(): string {
  10. return this._innerContext.orchestrationId;
  11. }
  12. public getWorkflowActivityId(): number {
  13. return this._innerContext.taskId;
  14. }
  15. }

查看工作流活动的上下文。

定义您希望工作流执行的工作流活动。 活动是一个类的定义,可以有输入和输出。 活动还参与依赖注入,如绑定到 Dapr 客户端。

以下示例中调用的活动包括:

  • NotifyActivity: 接收新订单通知。
  • ReserveInventoryActivity:检查是否有足够的库存来满足新订单。
  • ProcessPaymentActivity: 处理订单的付款。 包括NotifyActivity来发送成功订单的通知。

NotifyActivity

  1. public class NotifyActivity : WorkflowActivity<Notification, object>
  2. {
  3. //...
  4. public NotifyActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<NotifyActivity>();
  7. }
  8. //...
  9. }

查看完整的 NotifyActivity.cs 工作流活动示例。

ReserveInventoryActivity

  1. public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
  2. {
  3. //...
  4. public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
  7. this.client = client;
  8. }
  9. //...
  10. }

查看完整的 ReserveInventoryActivity.cs 工作流活动示例。

ProcessPaymentActivity

  1. public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
  2. {
  3. //...
  4. public ProcessPaymentActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
  7. }
  8. //...
  9. }

查看完整的 ProcessPaymentActivity.cs 工作流活动示例。

定义您希望工作流执行的工作流活动。 活动被包装在公共的DemoWorkflowActivity类中,该类实现了工作流活动。

  1. @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
  2. public class DemoWorkflowActivity implements WorkflowActivity {
  3. @Override
  4. public DemoActivityOutput run(WorkflowActivityContext ctx) {
  5. Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
  6. logger.info("Starting Activity: " + ctx.getName());
  7. var message = ctx.getInput(DemoActivityInput.class).getMessage();
  8. var newMessage = message + " World!, from Activity";
  9. logger.info("Message Received from input: " + message);
  10. logger.info("Sending message to output: " + newMessage);
  11. logger.info("Sleeping for 5 seconds to simulate long running operation...");
  12. try {
  13. TimeUnit.SECONDS.sleep(5);
  14. } catch (InterruptedException e) {
  15. throw new RuntimeException(e);
  16. }
  17. logger.info("Activity finished");
  18. var output = new DemoActivityOutput(message, newMessage);
  19. logger.info("Activity returned: " + output);
  20. return output;
  21. }
  22. }

查看上下文中的Java SDK工作流活动示例。

定义您希望工作流执行的每个工作流活动。 可以使用ctx.GetInput从上下文中解组活动输入。 活动应该被定义为接受一个 ctx workflow.ActivityContext 参数并返回一个接口和错误。

  1. func TestActivity(ctx workflow.ActivityContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return "", err
  5. }
  6. // Do something here
  7. return "result", nil
  8. }

查看Go SDK工作流活动示例的上下文。

编写工作流

接下来,在工作流中注册并调用活动。

hello_world_wf 函数是从一个名为 DaprWorkflowContext 的类派生出来的,具有输入和输出参数类型。 它还包括一个“yield”语句,用于执行工作流的繁重工作并调用工作流活动。

  1. def hello_world_wf(ctx: DaprWorkflowContext, input):
  2. print(f'{input}')
  3. yield ctx.call_activity(hello_act, input=1)
  4. yield ctx.call_activity(hello_act, input=10)
  5. yield ctx.wait_for_external_event("event1")
  6. yield ctx.call_activity(hello_act, input=100)
  7. yield ctx.call_activity(hello_act, input=1000)

查看上下文中的hello_world_wf工作流。

接下来,使用 WorkflowRuntime 类注册工作流并启动工作流运行时。

  1. export default class WorkflowRuntime {
  2. //..
  3. // Register workflow implementation for handling orchestrations
  4. public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
  5. const name = getFunctionName(workflow);
  6. const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
  7. const workflowContext = new WorkflowContext(ctx);
  8. return workflow(workflowContext, input);
  9. };
  10. this.worker.addNamedOrchestrator(name, workflowWrapper);
  11. return this;
  12. }
  13. // Register workflow activities
  14. public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
  15. const name = getFunctionName(fn);
  16. const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => {
  17. const wfActivityContext = new WorkflowActivityContext(ctx);
  18. return fn(wfActivityContext, intput);
  19. };
  20. this.worker.addNamedActivity(name, activityWrapper);
  21. return this;
  22. }
  23. // Start the workflow runtime processing items and block.
  24. public async start() {
  25. await this.worker.start();
  26. }
  27. }

查看上下文中的WorkflowRuntime。

OrderProcessingWorkflow 类是从一个名为 Workflow 的基类派生出来的,具有输入和输出参数类型。 它还包括一个RunAsync方法,用于执行工作流的繁重工作并调用工作流活动。

  1. class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
  2. {
  3. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  4. {
  5. //...
  6. await context.CallActivityAsync(
  7. nameof(NotifyActivity),
  8. new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
  9. //...
  10. InventoryResult result = await context.CallActivityAsync<InventoryResult>(
  11. nameof(ReserveInventoryActivity),
  12. new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
  13. //...
  14. await context.CallActivityAsync(
  15. nameof(ProcessPaymentActivity),
  16. new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
  17. await context.CallActivityAsync(
  18. nameof(NotifyActivity),
  19. new Notification($"Order {orderId} processed successfully!"));
  20. // End the workflow with a success result
  21. return new OrderResult(Processed: true);
  22. }
  23. }

查看完整的 OrderProcessingWorkflow.cs 工作流示例。

接下来,使用 WorkflowRuntimeBuilder 注册工作流并启动工作流运行时。

  1. public class DemoWorkflowWorker {
  2. public static void main(String[] args) throws Exception {
  3. // Register the Workflow with the builder.
  4. WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
  5. builder.registerActivity(DemoWorkflowActivity.class);
  6. // Build and then start the workflow runtime pulling and executing tasks
  7. try (WorkflowRuntime runtime = builder.build()) {
  8. System.out.println("Start workflow runtime");
  9. runtime.start();
  10. }
  11. System.exit(0);
  12. }
  13. }

查看上下文中的Java SDK工作流。

使用参数 ctx *workflow.WorkflowContext 定义您的工作流函数,并返回任何结果和错误。 从您的工作流中调用您定义的活动。

  1. func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return nil, err
  5. }
  6. var output string
  7. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  8. return nil, err
  9. }
  10. if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
  11. return nil, err
  12. }
  13. if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
  14. return nil, nil
  15. }
  16. return output, nil
  17. }

查看上下文中的Go SDK工作流。

编写应用程序

最后,使用工作流编写应用程序。

在下面的示例中,对于使用 Python SDK 的基本 Python hello world 应用程序,您的项目代码将包括:

  • 一个名为 DaprClient 的 Python 包,用于接收 Python SDK 功能。
  • 调用具有扩展的构建器:
  • API 调用 在下面的示例中,这些调用包括启动、暂停、恢复、清除和终止工作流。
  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. # ...
  4. def main():
  5. with DaprClient() as d:
  6. host = settings.DAPR_RUNTIME_HOST
  7. port = settings.DAPR_GRPC_PORT
  8. workflowRuntime = WorkflowRuntime(host, port)
  9. workflowRuntime = WorkflowRuntime()
  10. workflowRuntime.register_workflow(hello_world_wf)
  11. workflowRuntime.register_activity(hello_act)
  12. workflowRuntime.start()
  13. # Start workflow
  14. print("==========Start Counter Increase as per Input:==========")
  15. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  16. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  17. print(f"start_resp {start_resp.instance_id}")
  18. # ...
  19. # Pause workflow
  20. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  21. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  22. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  23. # Resume workflow
  24. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  25. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  26. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
  27. sleep(1)
  28. # Raise workflow
  29. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  30. event_name=eventName, event_data=eventData)
  31. sleep(5)
  32. # Purge workflow
  33. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  34. try:
  35. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  36. except DaprInternalError as err:
  37. if nonExistentIDError in err._message:
  38. print("Instance Successfully Purged")
  39. # Kick off another workflow for termination purposes
  40. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  41. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  42. print(f"start_resp {start_resp.instance_id}")
  43. # Terminate workflow
  44. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  45. sleep(1)
  46. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  47. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  48. # Purge workflow
  49. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  50. try:
  51. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  52. except DaprInternalError as err:
  53. if nonExistentIDError in err._message:
  54. print("Instance Successfully Purged")
  55. workflowRuntime.shutdown()
  56. if __name__ == '__main__':
  57. main()

下面的示例是使用JavaScript SDK的基本JavaScript应用程序。 就像这个例子一样,您的项目代码将包括:

  • 调用具有扩展的构建器:
  • API 调用 在下面的示例中,这些调用启动、终止、获取状态、暂停、恢复、触发事件和清除工作流程。
  1. import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
  2. import { WorkflowState } from "./WorkflowState";
  3. import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
  4. import { TWorkflow } from "../../types/workflow/Workflow.type";
  5. import { getFunctionName } from "../internal";
  6. import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
  7. /** DaprWorkflowClient class defines client operations for managing workflow instances. */
  8. export default class DaprWorkflowClient {
  9. private readonly _innerClient: TaskHubGrpcClient;
  10. /** Initialize a new instance of the DaprWorkflowClient.
  11. */
  12. constructor(options: Partial<WorkflowClientOptions> = {}) {
  13. const grpcEndpoint = generateEndpoint(options);
  14. options.daprApiToken = getDaprApiToken(options);
  15. this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
  16. }
  17. private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
  18. let innerOptions = options?.grpcOptions;
  19. if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
  20. innerOptions = {
  21. ...innerOptions,
  22. interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
  23. };
  24. }
  25. return new TaskHubGrpcClient(hostAddress, innerOptions);
  26. }
  27. /**
  28. * Schedule a new workflow using the DurableTask client.
  29. */
  30. public async scheduleNewWorkflow(
  31. workflow: TWorkflow | string,
  32. input?: any,
  33. instanceId?: string,
  34. startAt?: Date,
  35. ): Promise<string> {
  36. if (typeof workflow === "string") {
  37. return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);
  38. }
  39. return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
  40. }
  41. /**
  42. * Terminate the workflow associated with the provided instance id.
  43. *
  44. * @param {string} workflowInstanceId - Workflow instance id to terminate.
  45. * @param {any} output - The optional output to set for the terminated workflow instance.
  46. */
  47. public async terminateWorkflow(workflowInstanceId: string, output: any) {
  48. await this._innerClient.terminateOrchestration(workflowInstanceId, output);
  49. }
  50. /**
  51. * Fetch workflow instance metadata from the configured durable store.
  52. */
  53. public async getWorkflowState(
  54. workflowInstanceId: string,
  55. getInputsAndOutputs: boolean,
  56. ): Promise<WorkflowState | undefined> {
  57. const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);
  58. if (state !== undefined) {
  59. return new WorkflowState(state);
  60. }
  61. }
  62. /**
  63. * Waits for a workflow to start running
  64. */
  65. public async waitForWorkflowStart(
  66. workflowInstanceId: string,
  67. fetchPayloads = true,
  68. timeoutInSeconds = 60,
  69. ): Promise<WorkflowState | undefined> {
  70. const state = await this._innerClient.waitForOrchestrationStart(
  71. workflowInstanceId,
  72. fetchPayloads,
  73. timeoutInSeconds,
  74. );
  75. if (state !== undefined) {
  76. return new WorkflowState(state);
  77. }
  78. }
  79. /**
  80. * Waits for a workflow to complete running
  81. */
  82. public async waitForWorkflowCompletion(
  83. workflowInstanceId: string,
  84. fetchPayloads = true,
  85. timeoutInSeconds = 60,
  86. ): Promise<WorkflowState | undefined> {
  87. const state = await this._innerClient.waitForOrchestrationCompletion(
  88. workflowInstanceId,
  89. fetchPayloads,
  90. timeoutInSeconds,
  91. );
  92. if (state != undefined) {
  93. return new WorkflowState(state);
  94. }
  95. }
  96. /**
  97. * Sends an event notification message to an awaiting workflow instance
  98. */
  99. public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
  100. this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  101. }
  102. /**
  103. * Purges the workflow instance state from the workflow state store.
  104. */
  105. public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
  106. const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
  107. if (purgeResult !== undefined) {
  108. return purgeResult.deletedInstanceCount > 0;
  109. }
  110. return false;
  111. }
  112. /**
  113. * Closes the inner DurableTask client and shutdown the GRPC channel.
  114. */
  115. public async stop() {
  116. await this._innerClient.stop();
  117. }
  118. }

在下面的 Program.cs 示例中,对于使用 .NET SDK 的基本 ASP.NET 订单处理应用程序,您的项目代码将包括:

  • 一个名为 Dapr.Workflow 的 NuGet 包,用于接收 .NET SDK 的功能
  • 一个带有名为 AddDaprWorkflow 的扩展方法的构建器
    • 这将允许您注册工作流和工作流活动(工作流可以调度的任务)。
  • HTTP API 调用
    • 一个用于提交新订单
    • 一个用于检查现有订单的状态
  1. using Dapr.Workflow;
  2. //...
  3. // Dapr Workflows are registered as part of the service configuration
  4. builder.Services.AddDaprWorkflow(options =>
  5. {
  6. // Note that it's also possible to register a lambda function as the workflow
  7. // or activity implementation instead of a class.
  8. options.RegisterWorkflow<OrderProcessingWorkflow>();
  9. // These are the activities that get invoked by the workflow(s).
  10. options.RegisterActivity<NotifyActivity>();
  11. options.RegisterActivity<ReserveInventoryActivity>();
  12. options.RegisterActivity<ProcessPaymentActivity>();
  13. });
  14. WebApplication app = builder.Build();
  15. // POST starts new order workflow instance
  16. app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
  17. {
  18. if (orderInfo?.Name == null)
  19. {
  20. return Results.BadRequest(new
  21. {
  22. message = "Order data was missing from the request",
  23. example = new OrderPayload("Paperclips", 99.95),
  24. });
  25. }
  26. //...
  27. });
  28. // GET fetches state for order workflow to report status
  29. app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
  30. {
  31. WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
  32. if (!state.Exists)
  33. {
  34. return Results.NotFound($"No order with ID = '{orderId}' was found.");
  35. }
  36. var httpResponsePayload = new
  37. {
  38. details = state.ReadInputAs<OrderPayload>(),
  39. status = state.RuntimeStatus.ToString(),
  40. result = state.ReadOutputAs<OrderResult>(),
  41. };
  42. //...
  43. }).WithName("GetOrderInfoEndpoint");
  44. app.Run();

如下面的示例所示,使用Java SDK和Dapr Workflow的hello-world应用程序将包括:

  • 一个名为 io.dapr.workflows.client 的 Java 包,用于接收 Java SDK 客户端的功能。
  • 导入 io.dapr.workflows.Workflow
  • 扩展WorkflowDemoWorkflow
  • 使用输入和输出创建工作流。
  • API 调用 在下面的示例中,这些调用会启动并调用工作流活动。
  1. package io.dapr.examples.workflows;
  2. import com.microsoft.durabletask.CompositeTaskFailedException;
  3. import com.microsoft.durabletask.Task;
  4. import com.microsoft.durabletask.TaskCanceledException;
  5. import io.dapr.workflows.Workflow;
  6. import io.dapr.workflows.WorkflowStub;
  7. import java.time.Duration;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. /**
  11. * Implementation of the DemoWorkflow for the server side.
  12. */
  13. public class DemoWorkflow extends Workflow {
  14. @Override
  15. public WorkflowStub create() {
  16. return ctx -> {
  17. ctx.getLogger().info("Starting Workflow: " + ctx.getName());
  18. // ...
  19. ctx.getLogger().info("Calling Activity...");
  20. var input = new DemoActivityInput("Hello Activity!");
  21. var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
  22. // ...
  23. };
  24. }
  25. }

查看上下文中的完整Java SDK工作流示例。

如下面的示例所示,使用 Go SDK 和 Dapr Workflow 的 hello-world 应用程序将包括:

  • 一个Go包叫做client,用于接收Go SDK客户端功能。
  • TestWorkflow 方法
  • 使用输入和输出创建工作流。
  • API 调用 在下面的示例中,这些调用会启动并调用工作流活动。
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/dapr/go-sdk/client"
  8. "github.com/dapr/go-sdk/workflow"
  9. )
  10. var stage = 0
  11. const (
  12. workflowComponent = "dapr"
  13. )
  14. func main() {
  15. w, err := workflow.NewWorker()
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. fmt.Println("Worker initialized")
  20. if err := w.RegisterWorkflow(TestWorkflow); err != nil {
  21. log.Fatal(err)
  22. }
  23. fmt.Println("TestWorkflow registered")
  24. if err := w.RegisterActivity(TestActivity); err != nil {
  25. log.Fatal(err)
  26. }
  27. fmt.Println("TestActivity registered")
  28. // Start workflow runner
  29. if err := w.Start(); err != nil {
  30. log.Fatal(err)
  31. }
  32. fmt.Println("runner started")
  33. daprClient, err := client.NewClient()
  34. if err != nil {
  35. log.Fatalf("failed to intialise client: %v", err)
  36. }
  37. defer daprClient.Close()
  38. ctx := context.Background()
  39. // Start workflow test
  40. respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
  41. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  42. WorkflowComponent: workflowComponent,
  43. WorkflowName: "TestWorkflow",
  44. Options: nil,
  45. Input: 1,
  46. SendRawInput: false,
  47. })
  48. if err != nil {
  49. log.Fatalf("failed to start workflow: %v", err)
  50. }
  51. fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
  52. // Pause workflow test
  53. err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
  54. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  55. WorkflowComponent: workflowComponent,
  56. })
  57. if err != nil {
  58. log.Fatalf("failed to pause workflow: %v", err)
  59. }
  60. respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  61. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  62. WorkflowComponent: workflowComponent,
  63. })
  64. if err != nil {
  65. log.Fatalf("failed to get workflow: %v", err)
  66. }
  67. if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
  68. log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
  69. }
  70. fmt.Printf("workflow paused\n")
  71. // Resume workflow test
  72. err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
  73. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  74. WorkflowComponent: workflowComponent,
  75. })
  76. if err != nil {
  77. log.Fatalf("failed to resume workflow: %v", err)
  78. }
  79. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  80. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  81. WorkflowComponent: workflowComponent,
  82. })
  83. if err != nil {
  84. log.Fatalf("failed to get workflow: %v", err)
  85. }
  86. if respGet.RuntimeStatus != workflow.StatusRunning.String() {
  87. log.Fatalf("workflow not running")
  88. }
  89. fmt.Println("workflow resumed")
  90. fmt.Printf("stage: %d\n", stage)
  91. // Raise Event Test
  92. err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
  93. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  94. WorkflowComponent: workflowComponent,
  95. EventName: "testEvent",
  96. EventData: "testData",
  97. SendRawData: false,
  98. })
  99. if err != nil {
  100. fmt.Printf("failed to raise event: %v", err)
  101. }
  102. fmt.Println("workflow event raised")
  103. time.Sleep(time.Second) // allow workflow to advance
  104. fmt.Printf("stage: %d\n", stage)
  105. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  106. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  107. WorkflowComponent: workflowComponent,
  108. })
  109. if err != nil {
  110. log.Fatalf("failed to get workflow: %v", err)
  111. }
  112. fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
  113. // Purge workflow test
  114. err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
  115. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  116. WorkflowComponent: workflowComponent,
  117. })
  118. if err != nil {
  119. log.Fatalf("failed to purge workflow: %v", err)
  120. }
  121. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  122. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  123. WorkflowComponent: workflowComponent,
  124. })
  125. if err != nil && respGet != nil {
  126. log.Fatal("failed to purge workflow")
  127. }
  128. fmt.Println("workflow purged")
  129. fmt.Printf("stage: %d\n", stage)
  130. // Terminate workflow test
  131. respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
  132. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  133. WorkflowComponent: workflowComponent,
  134. WorkflowName: "TestWorkflow",
  135. Options: nil,
  136. Input: 1,
  137. SendRawInput: false,
  138. })
  139. if err != nil {
  140. log.Fatalf("failed to start workflow: %v", err)
  141. }
  142. fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
  143. err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
  144. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  145. WorkflowComponent: workflowComponent,
  146. })
  147. if err != nil {
  148. log.Fatalf("failed to terminate workflow: %v", err)
  149. }
  150. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  151. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  152. WorkflowComponent: workflowComponent,
  153. })
  154. if err != nil {
  155. log.Fatalf("failed to get workflow: %v", err)
  156. }
  157. if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
  158. log.Fatal("failed to terminate workflow")
  159. }
  160. fmt.Println("workflow terminated")
  161. err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
  162. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  163. WorkflowComponent: workflowComponent,
  164. })
  165. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  166. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  167. WorkflowComponent: workflowComponent,
  168. })
  169. if err == nil || respGet != nil {
  170. log.Fatalf("failed to purge workflow: %v", err)
  171. }
  172. fmt.Println("workflow purged")
  173. stage = 0
  174. fmt.Println("workflow client test")
  175. wfClient, err := workflow.NewClient()
  176. if err != nil {
  177. log.Fatalf("[wfclient] faield to initialize: %v", err)
  178. }
  179. id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
  180. if err != nil {
  181. log.Fatalf("[wfclient] failed to start workflow: %v", err)
  182. }
  183. fmt.Printf("[wfclient] started workflow with id: %s\n", id)
  184. metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
  185. if err != nil {
  186. log.Fatalf("[wfclient] failed to get worfklow: %v", err)
  187. }
  188. fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
  189. if stage != 1 {
  190. log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
  191. }
  192. fmt.Printf("[wfclient] stage: %d\n", stage)
  193. // raise event
  194. if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
  195. log.Fatalf("[wfclient] failed to raise event: %v", err)
  196. }
  197. fmt.Println("[wfclient] event raised")
  198. // Sleep to allow the workflow to advance
  199. time.Sleep(time.Second)
  200. if stage != 2 {
  201. log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
  202. }
  203. fmt.Printf("[wfclient] stage: %d\n", stage)
  204. // stop workflow
  205. if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
  206. log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
  207. }
  208. fmt.Println("[wfclient] workflow terminated")
  209. if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
  210. log.Fatalf("[wfclient] failed to purge workflow: %v", err)
  211. }
  212. fmt.Println("[wfclient] workflow purged")
  213. // stop workflow runtime
  214. if err := w.Shutdown(); err != nil {
  215. log.Fatalf("failed to shutdown runtime: %v", err)
  216. }
  217. fmt.Println("workflow worker successfully shutdown")
  218. }
  219. func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  220. var input int
  221. if err := ctx.GetInput(&input); err != nil {
  222. return nil, err
  223. }
  224. var output string
  225. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  226. return nil, err
  227. }
  228. err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
  229. if err != nil {
  230. return nil, err
  231. }
  232. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  233. return nil, err
  234. }
  235. return output, nil
  236. }
  237. func TestActivity(ctx workflow.ActivityContext) (any, error) {
  238. var input int
  239. if err := ctx.GetInput(&input); err != nil {
  240. return "", err
  241. }
  242. stage += input
  243. return fmt.Sprintf("Stage: %d", stage), nil
  244. }

在上下文中查看完整的Go SDK工作流示例。

重要

由于基于重播的工作流的执行方式,您将编写执行 I/O 和与系统交互等操作的逻辑 内部活动。 与此同时,工作流方法只是为了协调这些活动。

下一步

现在您已经编写了工作流程,请学习如何管理它。

管理工作流程 >>

相关链接