How to: Author a workflow

Learn how to develop and author workflows

This article provides a high-level overview of how to author workflows that are executed by the Dapr Workflow engine.

Note

If you haven’t already, try out the workflow quickstart for a quick walk-through on how to use workflows.

Author workflows as code

Dapr Workflow logic is implemented using general purpose programming languages, allowing you to:

  • Use your preferred programming language (no need to learn a new DSL or YAML schema).
  • Have access to the language’s standard libraries.
  • Build your own libraries and abstractions.
  • Use debuggers and examine local variables.
  • Write unit tests for your workflows, just like any other part of your application logic.

The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar simply drives the execution of the workflows, leaving all the workflow activities to be part of the application.

Write the workflow activities

Workflow activities are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.

Define the workflow activities you’d like your workflow to perform. Activities are a class definition and can take inputs and outputs. Activities also participate in dependency injection, like binding to a Dapr client.

The activities called in the example below are:

  • NotifyActivity: Receive notification of a new order.
  • ReserveInventoryActivity: Check for sufficient inventory to meet the new order.
  • ProcessPaymentActivity: Process payment for the order. Includes NotifyActivity to send notification of successful order.

NotifyActivity

  1. public class NotifyActivity : WorkflowActivity<Notification, object>
  2. {
  3. //...
  4. public NotifyActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<NotifyActivity>();
  7. }
  8. //...
  9. }

See the full NotifyActivity.cs workflow activity example.

ReserveInventoryActivity

  1. public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
  2. {
  3. //...
  4. public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
  7. this.client = client;
  8. }
  9. //...
  10. }

See the full ReserveInventoryActivity.cs workflow activity example.

ProcessPaymentActivity

  1. public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
  2. {
  3. //...
  4. public ProcessPaymentActivity(ILoggerFactory loggerFactory)
  5. {
  6. this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
  7. }
  8. //...
  9. }

See the full ProcessPaymentActivity.cs workflow activity example.

Define the workflow activities you’d like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called hello_act that notifies users of the current counter value. hello_act is a function derived from a class called WorkflowActivityContext.

  1. def hello_act(ctx: WorkflowActivityContext, input):
  2. global counter
  3. counter += input
  4. print(f'New counter value is: {counter}!', flush=True)

See the hello_act workflow activity in context.

Write the workflow

Next, register and call the activites in a workflow.

The OrderProcessingWorkflow class is derived from a base class called Workflow with input and output parameter types. It also includes a RunAsync method that does the heavy lifting of the workflow and calls the workflow activities.

  1. class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
  2. {
  3. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  4. {
  5. //...
  6. await context.CallActivityAsync(
  7. nameof(NotifyActivity),
  8. new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
  9. //...
  10. InventoryResult result = await context.CallActivityAsync<InventoryResult>(
  11. nameof(ReserveInventoryActivity),
  12. new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
  13. //...
  14. await context.CallActivityAsync(
  15. nameof(ProcessPaymentActivity),
  16. new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
  17. await context.CallActivityAsync(
  18. nameof(NotifyActivity),
  19. new Notification($"Order {orderId} processed successfully!"));
  20. // End the workflow with a success result
  21. return new OrderResult(Processed: true);
  22. }
  23. }

See the full workflow example in OrderProcessingWorkflow.cs.

The hello_world_wf function is derived from a class called DaprWorkflowContext with input and output parameter types. It also includes a yield statement that does the heavy lifting of the workflow and calls the workflow activities.

  1. def hello_world_wf(ctx: DaprWorkflowContext, input):
  2. print(f'{input}')
  3. yield ctx.call_activity(hello_act, input=1)
  4. yield ctx.call_activity(hello_act, input=10)
  5. yield ctx.wait_for_external_event("event1")
  6. yield ctx.call_activity(hello_act, input=100)
  7. yield ctx.call_activity(hello_act, input=1000)

See the hello_world_wf workflow in context.

Write the application

Finally, compose the application using the workflow.

In the following Program.cs example, for a basic ASP.NET order processing application using the .NET SDK, your project code would include:

  • A NuGet package called Dapr.Workflow to receive the .NET SDK capabilities
  • A builder with an extension method called AddDaprWorkflow
    • This will allow you to register workflows and workflow activities (tasks that workflows can schedule)
  • HTTP API calls
    • One for submitting a new order
    • One for checking the status of an existing order
  1. using Dapr.Workflow;
  2. //...
  3. // Dapr Workflows are registered as part of the service configuration
  4. builder.Services.AddDaprWorkflow(options =>
  5. {
  6. // Note that it's also possible to register a lambda function as the workflow
  7. // or activity implementation instead of a class.
  8. options.RegisterWorkflow<OrderProcessingWorkflow>();
  9. // These are the activities that get invoked by the workflow(s).
  10. options.RegisterActivity<NotifyActivity>();
  11. options.RegisterActivity<ReserveInventoryActivity>();
  12. options.RegisterActivity<ProcessPaymentActivity>();
  13. });
  14. WebApplication app = builder.Build();
  15. // POST starts new order workflow instance
  16. app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) =>
  17. {
  18. if (orderInfo?.Name == null)
  19. {
  20. return Results.BadRequest(new
  21. {
  22. message = "Order data was missing from the request",
  23. example = new OrderPayload("Paperclips", 99.95),
  24. });
  25. }
  26. //...
  27. });
  28. // GET fetches state for order workflow to report status
  29. app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) =>
  30. {
  31. WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
  32. if (!state.Exists)
  33. {
  34. return Results.NotFound($"No order with ID = '{orderId}' was found.");
  35. }
  36. var httpResponsePayload = new
  37. {
  38. details = state.ReadInputAs<OrderPayload>(),
  39. status = state.RuntimeStatus.ToString(),
  40. result = state.ReadOutputAs<OrderResult>(),
  41. };
  42. //...
  43. }).WithName("GetOrderInfoEndpoint");
  44. app.Run();

In the following example, for a basic Python hello world application using the Python SDK, your project code would include:

  • A Python package called DaprClient to receive the Python SDK capabilities.
  • A builder with extensions called:
  • API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
  1. from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
  2. from dapr.clients import DaprClient
  3. # ...
  4. def main():
  5. with DaprClient() as d:
  6. host = settings.DAPR_RUNTIME_HOST
  7. port = settings.DAPR_GRPC_PORT
  8. workflowRuntime = WorkflowRuntime(host, port)
  9. workflowRuntime = WorkflowRuntime()
  10. workflowRuntime.register_workflow(hello_world_wf)
  11. workflowRuntime.register_activity(hello_act)
  12. workflowRuntime.start()
  13. # Start workflow
  14. print("==========Start Counter Increase as per Input:==========")
  15. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  16. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  17. print(f"start_resp {start_resp.instance_id}")
  18. # ...
  19. # Pause workflow
  20. d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  21. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  22. print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
  23. # Resume workflow
  24. d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  25. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  26. print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
  27. sleep(1)
  28. # Raise workflow
  29. d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
  30. event_name=eventName, event_data=eventData)
  31. sleep(5)
  32. # Purge workflow
  33. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  34. try:
  35. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  36. except DaprInternalError as err:
  37. if nonExistentIDError in err._message:
  38. print("Instance Successfully Purged")
  39. # Kick off another workflow for termination purposes
  40. start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
  41. workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
  42. print(f"start_resp {start_resp.instance_id}")
  43. # Terminate workflow
  44. d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  45. sleep(1)
  46. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  47. print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
  48. # Purge workflow
  49. d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  50. try:
  51. getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
  52. except DaprInternalError as err:
  53. if nonExistentIDError in err._message:
  54. print("Instance Successfully Purged")
  55. workflowRuntime.shutdown()
  56. if __name__ == '__main__':
  57. main()

Important

Because of how replay-based workflows execute, you’ll write logic that does things like I/O and interacting with systems inside activities. Meanwhile, the workflow method is just for orchestrating those activities.

Next steps

Now that you’ve authored a workflow, learn how to manage it.

Manage workflows >>

Last modified June 19, 2023: Merge pull request #3565 from dapr/aacrawfi/skip-secrets-close (b1763bf)