How to: Author a workflow

Learn how to develop and author workflows

Note

Dapr Workflow is currently in beta. See known limitations for 1.14.1.

This article provides a high-level overview of how to author workflows that are executed by the Dapr Workflow engine.

Note

If you haven’t already, try out the workflow quickstart for a quick walk-through on how to use workflows.

Author workflows as code

Dapr Workflow logic is implemented using general purpose programming languages, allowing you to:

  • Use your preferred programming language (no need to learn a new DSL or YAML schema).
  • Have access to the language’s standard libraries.
  • Build your own libraries and abstractions.
  • Use debuggers and examine local variables.
  • Write unit tests for your workflows, just like any other part of your application logic.

The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar simply drives the execution of the workflows, leaving all the workflow activities to be part of the application.

Write the workflow activities

Workflow activities are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.

Define the workflow activities you’d like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called hello_act that notifies users of the current counter value. hello_act is a function derived from a class called WorkflowActivityContext.

  1. def hello_act(ctx: WorkflowActivityContext, input):
  2. global counter
  3. counter += input
  4. print(f'New counter value is: {counter}!', flush=True)

See the hello_act workflow activity in context.

Define the workflow activities you’d like your workflow to perform. Activities are wrapped in the WorkflowActivityContext class, which implements the workflow activities.

  1. export default class WorkflowActivityContext {
  2. private readonly _innerContext: ActivityContext;
  3. constructor(innerContext: ActivityContext) {
  4. if (!innerContext) {
  5. throw new Error("ActivityContext cannot be undefined");
  6. }
  7. this._innerContext = innerContext;
  8. }
  9. public getWorkflowInstanceId(): string {
  10. return this._innerContext.orchestrationId;
  11. }
  12. public getWorkflowActivityId(): number {
  13. return this._innerContext.taskId;
  14. }
  15. }

See the workflow activity in context.

Define the workflow activities you’d like your workflow to perform. Activities are a class definition and can take inputs and outputs. Activities also participate in dependency injection, like binding to a Dapr client.

The activities called in the example below are:

  • NotifyActivity: Receive notification of a new order.
  • ReserveInventoryActivity: Check for sufficient inventory to meet the new order.
  • ProcessPaymentActivity: Process payment for the order. Includes NotifyActivity to send notification of successful order.

NotifyActivity

  1. public class NotifyActivity : WorkflowActivity<Notification, object>
  2. {
  3. //...
  4. public NotifyActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<NotifyActivity>();
  7. }
  8. //...
  9. }

See the full NotifyActivity.cs workflow activity example.

ReserveInventoryActivity

  1. public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
  2. {
  3. //...
  4. public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
  7. this.client = client;
  8. }
  9. //...
  10. }

See the full ReserveInventoryActivity.cs workflow activity example.

ProcessPaymentActivity

  1. public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
  2. {
  3. //...
  4. public ProcessPaymentActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
  7. }
  8. //...
  9. }

See the full ProcessPaymentActivity.cs workflow activity example.

Define the workflow activities you’d like your workflow to perform. Activities are wrapped in the public DemoWorkflowActivity class, which implements the workflow activities.

  1. @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
  2. public class DemoWorkflowActivity implements WorkflowActivity {
  3. @Override
  4. public DemoActivityOutput run(WorkflowActivityContext ctx) {
  5. Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
  6. logger.info("Starting Activity: " + ctx.getName());
  7. var message = ctx.getInput(DemoActivityInput.class).getMessage();
  8. var newMessage = message + " World!, from Activity";
  9. logger.info("Message Received from input: " + message);
  10. logger.info("Sending message to output: " + newMessage);
  11. logger.info("Sleeping for 5 seconds to simulate long running operation...");
  12. try {
  13. TimeUnit.SECONDS.sleep(5);
  14. } catch (InterruptedException e) {
  15. throw new RuntimeException(e);
  16. }
  17. logger.info("Activity finished");
  18. var output = new DemoActivityOutput(message, newMessage);
  19. logger.info("Activity returned: " + output);
  20. return output;
  21. }
  22. }

See the Java SDK workflow activity example in context.

Define each workflow activity you’d like your workflow to perform. The Activity input can be unmarshalled from the context with ctx.GetInput. Activities should be defined as taking a ctx workflow.ActivityContext parameter and returning an interface and error.

  1. func TestActivity(ctx workflow.ActivityContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return "", err
  5. }
  6. // Do something here
  7. return "result", nil
  8. }

See the Go SDK workflow activity example in context.

Write the workflow

Next, register and call the activites in a workflow.

The hello_world_wf function is derived from a class called DaprWorkflowContext with input and output parameter types. It also includes a yield statement that does the heavy lifting of the workflow and calls the workflow activities.

  1. def hello_world_wf(ctx: DaprWorkflowContext, input):
  2. print(f'{input}')
  3. yield ctx.call_activity(hello_act, input=1)
  4. yield ctx.call_activity(hello_act, input=10)
  5. yield ctx.wait_for_external_event("event1")
  6. yield ctx.call_activity(hello_act, input=100)
  7. yield ctx.call_activity(hello_act, input=1000)

See the hello_world_wf workflow in context.

Next, register the workflow with the WorkflowRuntime class and start the workflow runtime.

  1. export default class WorkflowRuntime {
  2. //..
  3. // Register workflow implementation for handling orchestrations
  4. public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
  5. const name = getFunctionName(workflow);
  6. const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
  7. const workflowContext = new WorkflowContext(ctx);
  8. return workflow(workflowContext, input);
  9. };
  10. this.worker.addNamedOrchestrator(name, workflowWrapper);
  11. return this;
  12. }
  13. // Register workflow activities
  14. public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
  15. const name = getFunctionName(fn);
  16. const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => {
  17. const wfActivityContext = new WorkflowActivityContext(ctx);
  18. return fn(wfActivityContext, intput);
  19. };
  20. this.worker.addNamedActivity(name, activityWrapper);
  21. return this;
  22. }
  23. // Start the workflow runtime processing items and block.
  24. public async start() {
  25. await this.worker.start();
  26. }
  27. }

See the WorkflowRuntime in context.

The OrderProcessingWorkflow class is derived from a base class called Workflow with input and output parameter types. It also includes a RunAsync method that does the heavy lifting of the workflow and calls the workflow activities.

  1. class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
  2. {
  3. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  4. {
  5. //...
  6. await context.CallActivityAsync(
  7. nameof(NotifyActivity),
  8. new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
  9. //...
  10. InventoryResult result = await context.CallActivityAsync<InventoryResult>(
  11. nameof(ReserveInventoryActivity),
  12. new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
  13. //...
  14. await context.CallActivityAsync(
  15. nameof(ProcessPaymentActivity),
  16. new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
  17. await context.CallActivityAsync(
  18. nameof(NotifyActivity),
  19. new Notification($"Order {orderId} processed successfully!"));
  20. // End the workflow with a success result
  21. return new OrderResult(Processed: true);
  22. }
  23. }

See the full workflow example in OrderProcessingWorkflow.cs.

Next, register the workflow with the WorkflowRuntimeBuilder and start the workflow runtime.

  1. public class DemoWorkflowWorker {
  2. public static void main(String[] args) throws Exception {
  3. // Register the Workflow with the builder.
  4. WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
  5. builder.registerActivity(DemoWorkflowActivity.class);
  6. // Build and then start the workflow runtime pulling and executing tasks
  7. try (WorkflowRuntime runtime = builder.build()) {
  8. System.out.println("Start workflow runtime");
  9. runtime.start();
  10. }
  11. System.exit(0);
  12. }
  13. }

See the Java SDK workflow in context.

Define your workflow function with the parameter ctx *workflow.WorkflowContext and return any and error. Invoke your defined activities from within your workflow.

  1. func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return nil, err
  5. }
  6. var output string
  7. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  8. return nil, err
  9. }
  10. if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
  11. return nil, err
  12. }
  13. if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
  14. return nil, nil
  15. }
  16. return output, nil
  17. }

See the Go SDK workflow in context.

Write the application

Finally, compose the application using the workflow.

In the following example, for a basic Python hello world application using the Python SDK, your project code would include:

  • A Python package called DaprClient to receive the Python SDK capabilities.
  • A builder with extensions called:
  • API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. # ...
  4. def main():
  5. with DaprClient() as d:
  6. host = settings.DAPR_RUNTIME_HOST
  7. port = settings.DAPR_GRPC_PORT
  8. workflowRuntime = WorkflowRuntime(host, port)
  9. workflowRuntime = WorkflowRuntime()
  10. workflowRuntime.register_workflow(hello_world_wf)
  11. workflowRuntime.register_activity(hello_act)
  12. workflowRuntime.start()
  13. # Start workflow
  14. print("==========Start Counter Increase as per Input:==========")
  15. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  16. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  17. print(f"start_resp {start_resp.instance_id}")
  18. # ...
  19. # Pause workflow
  20. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  21. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  22. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  23. # Resume workflow
  24. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  25. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  26. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
  27. sleep(1)
  28. # Raise workflow
  29. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  30. event_name=eventName, event_data=eventData)
  31. sleep(5)
  32. # Purge workflow
  33. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  34. try:
  35. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  36. except DaprInternalError as err:
  37. if nonExistentIDError in err._message:
  38. print("Instance Successfully Purged")
  39. # Kick off another workflow for termination purposes
  40. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  41. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  42. print(f"start_resp {start_resp.instance_id}")
  43. # Terminate workflow
  44. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  45. sleep(1)
  46. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  47. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  48. # Purge workflow
  49. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  50. try:
  51. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  52. except DaprInternalError as err:
  53. if nonExistentIDError in err._message:
  54. print("Instance Successfully Purged")
  55. workflowRuntime.shutdown()
  56. if __name__ == '__main__':
  57. main()

The following example is a basic JavaScript application using the JavaScript SDK. As in this example, your project code would include:

  • A builder with extensions called:
  • API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow.
  1. import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
  2. import { WorkflowState } from "./WorkflowState";
  3. import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
  4. import { TWorkflow } from "../../types/workflow/Workflow.type";
  5. import { getFunctionName } from "../internal";
  6. import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
  7. /** DaprWorkflowClient class defines client operations for managing workflow instances. */
  8. export default class DaprWorkflowClient {
  9. private readonly _innerClient: TaskHubGrpcClient;
  10. /** Initialize a new instance of the DaprWorkflowClient.
  11. */
  12. constructor(options: Partial<WorkflowClientOptions> = {}) {
  13. const grpcEndpoint = generateEndpoint(options);
  14. options.daprApiToken = getDaprApiToken(options);
  15. this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
  16. }
  17. private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
  18. let innerOptions = options?.grpcOptions;
  19. if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
  20. innerOptions = {
  21. ...innerOptions,
  22. interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
  23. };
  24. }
  25. return new TaskHubGrpcClient(hostAddress, innerOptions);
  26. }
  27. /**
  28. * Schedule a new workflow using the DurableTask client.
  29. */
  30. public async scheduleNewWorkflow(
  31. workflow: TWorkflow | string,
  32. input?: any,
  33. instanceId?: string,
  34. startAt?: Date,
  35. ): Promise<string> {
  36. if (typeof workflow === "string") {
  37. return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);
  38. }
  39. return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
  40. }
  41. /**
  42. * Terminate the workflow associated with the provided instance id.
  43. *
  44. * @param {string} workflowInstanceId - Workflow instance id to terminate.
  45. * @param {any} output - The optional output to set for the terminated workflow instance.
  46. */
  47. public async terminateWorkflow(workflowInstanceId: string, output: any) {
  48. await this._innerClient.terminateOrchestration(workflowInstanceId, output);
  49. }
  50. /**
  51. * Fetch workflow instance metadata from the configured durable store.
  52. */
  53. public async getWorkflowState(
  54. workflowInstanceId: string,
  55. getInputsAndOutputs: boolean,
  56. ): Promise<WorkflowState | undefined> {
  57. const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);
  58. if (state !== undefined) {
  59. return new WorkflowState(state);
  60. }
  61. }
  62. /**
  63. * Waits for a workflow to start running
  64. */
  65. public async waitForWorkflowStart(
  66. workflowInstanceId: string,
  67. fetchPayloads = true,
  68. timeoutInSeconds = 60,
  69. ): Promise<WorkflowState | undefined> {
  70. const state = await this._innerClient.waitForOrchestrationStart(
  71. workflowInstanceId,
  72. fetchPayloads,
  73. timeoutInSeconds,
  74. );
  75. if (state !== undefined) {
  76. return new WorkflowState(state);
  77. }
  78. }
  79. /**
  80. * Waits for a workflow to complete running
  81. */
  82. public async waitForWorkflowCompletion(
  83. workflowInstanceId: string,
  84. fetchPayloads = true,
  85. timeoutInSeconds = 60,
  86. ): Promise<WorkflowState | undefined> {
  87. const state = await this._innerClient.waitForOrchestrationCompletion(
  88. workflowInstanceId,
  89. fetchPayloads,
  90. timeoutInSeconds,
  91. );
  92. if (state != undefined) {
  93. return new WorkflowState(state);
  94. }
  95. }
  96. /**
  97. * Sends an event notification message to an awaiting workflow instance
  98. */
  99. public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
  100. this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  101. }
  102. /**
  103. * Purges the workflow instance state from the workflow state store.
  104. */
  105. public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
  106. const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
  107. if (purgeResult !== undefined) {
  108. return purgeResult.deletedInstanceCount > 0;
  109. }
  110. return false;
  111. }
  112. /**
  113. * Closes the inner DurableTask client and shutdown the GRPC channel.
  114. */
  115. public async stop() {
  116. await this._innerClient.stop();
  117. }
  118. }

In the following Program.cs example, for a basic ASP.NET order processing application using the .NET SDK, your project code would include:

  • A NuGet package called Dapr.Workflow to receive the .NET SDK capabilities
  • A builder with an extension method called AddDaprWorkflow
    • This will allow you to register workflows and workflow activities (tasks that workflows can schedule)
  • HTTP API calls
    • One for submitting a new order
    • One for checking the status of an existing order
  1. using Dapr.Workflow;
  2. //...
  3. // Dapr Workflows are registered as part of the service configuration
  4. builder.Services.AddDaprWorkflow(options =>
  5. {
  6. // Note that it's also possible to register a lambda function as the workflow
  7. // or activity implementation instead of a class.
  8. options.RegisterWorkflow<OrderProcessingWorkflow>();
  9. // These are the activities that get invoked by the workflow(s).
  10. options.RegisterActivity<NotifyActivity>();
  11. options.RegisterActivity<ReserveInventoryActivity>();
  12. options.RegisterActivity<ProcessPaymentActivity>();
  13. });
  14. WebApplication app = builder.Build();
  15. // POST starts new order workflow instance
  16. app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
  17. {
  18. if (orderInfo?.Name == null)
  19. {
  20. return Results.BadRequest(new
  21. {
  22. message = "Order data was missing from the request",
  23. example = new OrderPayload("Paperclips", 99.95),
  24. });
  25. }
  26. //...
  27. });
  28. // GET fetches state for order workflow to report status
  29. app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
  30. {
  31. WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
  32. if (!state.Exists)
  33. {
  34. return Results.NotFound($"No order with ID = '{orderId}' was found.");
  35. }
  36. var httpResponsePayload = new
  37. {
  38. details = state.ReadInputAs<OrderPayload>(),
  39. status = state.RuntimeStatus.ToString(),
  40. result = state.ReadOutputAs<OrderResult>(),
  41. };
  42. //...
  43. }).WithName("GetOrderInfoEndpoint");
  44. app.Run();

As in the following example, a hello-world application using the Java SDK and Dapr Workflow would include:

  • A Java package called io.dapr.workflows.client to receive the Java SDK client capabilities.
  • An import of io.dapr.workflows.Workflow
  • The DemoWorkflow class which extends Workflow
  • Creating the workflow with input and output.
  • API calls. In the example below, these calls start and call the workflow activities.
  1. package io.dapr.examples.workflows;
  2. import com.microsoft.durabletask.CompositeTaskFailedException;
  3. import com.microsoft.durabletask.Task;
  4. import com.microsoft.durabletask.TaskCanceledException;
  5. import io.dapr.workflows.Workflow;
  6. import io.dapr.workflows.WorkflowStub;
  7. import java.time.Duration;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. /**
  11. * Implementation of the DemoWorkflow for the server side.
  12. */
  13. public class DemoWorkflow extends Workflow {
  14. @Override
  15. public WorkflowStub create() {
  16. return ctx -> {
  17. ctx.getLogger().info("Starting Workflow: " + ctx.getName());
  18. // ...
  19. ctx.getLogger().info("Calling Activity...");
  20. var input = new DemoActivityInput("Hello Activity!");
  21. var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
  22. // ...
  23. };
  24. }
  25. }

See the full Java SDK workflow example in context.

As in the following example, a hello-world application using the Go SDK and Dapr Workflow would include:

  • A Go package called client to receive the Go SDK client capabilities.
  • The TestWorkflow method
  • Creating the workflow with input and output.
  • API calls. In the example below, these calls start and call the workflow activities.
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/dapr/go-sdk/client"
  8. "github.com/dapr/go-sdk/workflow"
  9. )
  10. var stage = 0
  11. const (
  12. workflowComponent = "dapr"
  13. )
  14. func main() {
  15. w, err := workflow.NewWorker()
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. fmt.Println("Worker initialized")
  20. if err := w.RegisterWorkflow(TestWorkflow); err != nil {
  21. log.Fatal(err)
  22. }
  23. fmt.Println("TestWorkflow registered")
  24. if err := w.RegisterActivity(TestActivity); err != nil {
  25. log.Fatal(err)
  26. }
  27. fmt.Println("TestActivity registered")
  28. // Start workflow runner
  29. if err := w.Start(); err != nil {
  30. log.Fatal(err)
  31. }
  32. fmt.Println("runner started")
  33. daprClient, err := client.NewClient()
  34. if err != nil {
  35. log.Fatalf("failed to intialise client: %v", err)
  36. }
  37. defer daprClient.Close()
  38. ctx := context.Background()
  39. // Start workflow test
  40. respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
  41. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  42. WorkflowComponent: workflowComponent,
  43. WorkflowName: "TestWorkflow",
  44. Options: nil,
  45. Input: 1,
  46. SendRawInput: false,
  47. })
  48. if err != nil {
  49. log.Fatalf("failed to start workflow: %v", err)
  50. }
  51. fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
  52. // Pause workflow test
  53. err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
  54. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  55. WorkflowComponent: workflowComponent,
  56. })
  57. if err != nil {
  58. log.Fatalf("failed to pause workflow: %v", err)
  59. }
  60. respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  61. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  62. WorkflowComponent: workflowComponent,
  63. })
  64. if err != nil {
  65. log.Fatalf("failed to get workflow: %v", err)
  66. }
  67. if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
  68. log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
  69. }
  70. fmt.Printf("workflow paused\n")
  71. // Resume workflow test
  72. err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
  73. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  74. WorkflowComponent: workflowComponent,
  75. })
  76. if err != nil {
  77. log.Fatalf("failed to resume workflow: %v", err)
  78. }
  79. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  80. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  81. WorkflowComponent: workflowComponent,
  82. })
  83. if err != nil {
  84. log.Fatalf("failed to get workflow: %v", err)
  85. }
  86. if respGet.RuntimeStatus != workflow.StatusRunning.String() {
  87. log.Fatalf("workflow not running")
  88. }
  89. fmt.Println("workflow resumed")
  90. fmt.Printf("stage: %d\n", stage)
  91. // Raise Event Test
  92. err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
  93. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  94. WorkflowComponent: workflowComponent,
  95. EventName: "testEvent",
  96. EventData: "testData",
  97. SendRawData: false,
  98. })
  99. if err != nil {
  100. fmt.Printf("failed to raise event: %v", err)
  101. }
  102. fmt.Println("workflow event raised")
  103. time.Sleep(time.Second) // allow workflow to advance
  104. fmt.Printf("stage: %d\n", stage)
  105. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  106. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  107. WorkflowComponent: workflowComponent,
  108. })
  109. if err != nil {
  110. log.Fatalf("failed to get workflow: %v", err)
  111. }
  112. fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
  113. // Purge workflow test
  114. err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
  115. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  116. WorkflowComponent: workflowComponent,
  117. })
  118. if err != nil {
  119. log.Fatalf("failed to purge workflow: %v", err)
  120. }
  121. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  122. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  123. WorkflowComponent: workflowComponent,
  124. })
  125. if err != nil && respGet != nil {
  126. log.Fatal("failed to purge workflow")
  127. }
  128. fmt.Println("workflow purged")
  129. fmt.Printf("stage: %d\n", stage)
  130. // Terminate workflow test
  131. respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
  132. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  133. WorkflowComponent: workflowComponent,
  134. WorkflowName: "TestWorkflow",
  135. Options: nil,
  136. Input: 1,
  137. SendRawInput: false,
  138. })
  139. if err != nil {
  140. log.Fatalf("failed to start workflow: %v", err)
  141. }
  142. fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
  143. err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
  144. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  145. WorkflowComponent: workflowComponent,
  146. })
  147. if err != nil {
  148. log.Fatalf("failed to terminate workflow: %v", err)
  149. }
  150. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  151. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  152. WorkflowComponent: workflowComponent,
  153. })
  154. if err != nil {
  155. log.Fatalf("failed to get workflow: %v", err)
  156. }
  157. if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
  158. log.Fatal("failed to terminate workflow")
  159. }
  160. fmt.Println("workflow terminated")
  161. err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
  162. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  163. WorkflowComponent: workflowComponent,
  164. })
  165. respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
  166. InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
  167. WorkflowComponent: workflowComponent,
  168. })
  169. if err == nil || respGet != nil {
  170. log.Fatalf("failed to purge workflow: %v", err)
  171. }
  172. fmt.Println("workflow purged")
  173. stage = 0
  174. fmt.Println("workflow client test")
  175. wfClient, err := workflow.NewClient()
  176. if err != nil {
  177. log.Fatalf("[wfclient] faield to initialize: %v", err)
  178. }
  179. id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
  180. if err != nil {
  181. log.Fatalf("[wfclient] failed to start workflow: %v", err)
  182. }
  183. fmt.Printf("[wfclient] started workflow with id: %s\n", id)
  184. metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
  185. if err != nil {
  186. log.Fatalf("[wfclient] failed to get worfklow: %v", err)
  187. }
  188. fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
  189. if stage != 1 {
  190. log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
  191. }
  192. fmt.Printf("[wfclient] stage: %d\n", stage)
  193. // raise event
  194. if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
  195. log.Fatalf("[wfclient] failed to raise event: %v", err)
  196. }
  197. fmt.Println("[wfclient] event raised")
  198. // Sleep to allow the workflow to advance
  199. time.Sleep(time.Second)
  200. if stage != 2 {
  201. log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
  202. }
  203. fmt.Printf("[wfclient] stage: %d\n", stage)
  204. // stop workflow
  205. if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
  206. log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
  207. }
  208. fmt.Println("[wfclient] workflow terminated")
  209. if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
  210. log.Fatalf("[wfclient] failed to purge workflow: %v", err)
  211. }
  212. fmt.Println("[wfclient] workflow purged")
  213. // stop workflow runtime
  214. if err := w.Shutdown(); err != nil {
  215. log.Fatalf("failed to shutdown runtime: %v", err)
  216. }
  217. fmt.Println("workflow worker successfully shutdown")
  218. }
  219. func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  220. var input int
  221. if err := ctx.GetInput(&input); err != nil {
  222. return nil, err
  223. }
  224. var output string
  225. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  226. return nil, err
  227. }
  228. err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
  229. if err != nil {
  230. return nil, err
  231. }
  232. if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
  233. return nil, err
  234. }
  235. return output, nil
  236. }
  237. func TestActivity(ctx workflow.ActivityContext) (any, error) {
  238. var input int
  239. if err := ctx.GetInput(&input); err != nil {
  240. return "", err
  241. }
  242. stage += input
  243. return fmt.Sprintf("Stage: %d", stage), nil
  244. }

See the full Go SDK workflow example in context.

Important

Because of how replay-based workflows execute, you’ll write logic that does things like I/O and interacting with systems inside activities. Meanwhile, the workflow method is just for orchestrating those activities.

Next steps

Now that you’ve authored a workflow, learn how to manage it.

Manage workflows >>

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)