Workflow patterns

Write different types of workflow patterns

Dapr Workflows simplify complex, stateful coordination requirements in microservice architectures. The following sections describe several application patterns that can benefit from Dapr Workflows.

Task chaining

In the task chaining pattern, multiple steps in a workflow are run in succession, and the output of one step may be passed as the input to the next step. Task chaining workflows typically involve creating a sequence of operations that need to be performed on some data, such as filtering, transforming, and reducing.

In some cases, the steps of the workflow may need to be orchestrated across multiple microservices. For increased reliability and scalability, you’re also likely to use queues to trigger the various steps.

Diagram showing how the task chaining workflow pattern works

While the pattern is simple, there are many complexities hidden in the implementation. For example:

  • What happens if one of the microservices are unavailable for an extended period of time?
  • Can failed steps be automatically retried?
  • If not, how do you facilitate the rollback of previously completed steps, if applicable?
  • Implementation details aside, is there a way to visualize the workflow so that other engineers can understand what it does and how it works?

Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.

  1. // Expotential backoff retry policy that survives long outages
  2. var retryPolicy = TaskOptions.FromRetryPolicy(new RetryPolicy(
  3. maxNumberOfAttempts: 10,
  4. firstRetryInterval: TimeSpan.FromMinutes(1),
  5. backoffCoefficient: 2.0,
  6. maxRetryInterval: TimeSpan.FromHours(1)));
  7. // Task failures are surfaced as ordinary .NET exceptions
  8. try
  9. {
  10. var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryPolicy);
  11. var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryPolicy);
  12. var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryPolicy);
  13. var result4 = await context.CallActivityAsync<Guid[]>("Step4", result3, retryPolicy);
  14. return string.Join(", ", result4);
  15. }
  16. catch (TaskFailedException)
  17. {
  18. // Retries expired - apply custom compensation logic
  19. await context.CallActivityAsync<long[]>("MyCompensation", options: retryPolicy);
  20. throw;
  21. }

Note

In the example above, "Step1", "Step2", "MyCompensation", etc. represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.

As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.

Behind the scenes, the Dapr Workflow runtime:

  • Takes care of executing the workflow and ensuring that it runs to completion.
  • Saves progress automatically.
  • Automatically resumes the workflow from the last completed step if the workflow process itself fails for any reason.
  • Enables error handling to be expressed naturally in your target programming language, allowing you to implement compensation logic easily.
  • Provides built-in retry configuration primitives to simplify the process of configuring complex retry policies for individual steps in the workflow.

Fan-out/fan-in

In the fan-out/fan-in design pattern, you execute multiple tasks simultaneously across potentially multiple workers, wait for them to finish, and perform some aggregation on the result.

Diagram showing how the fan-out/fan-in workflow pattern works

In addition to the challenges mentioned in the previous pattern, there are several important questions to consider when implementing the fan-out/fan-in pattern manually:

  • How do you control the degree of parallelism?
  • How do you know when to trigger subsequent aggregation steps?
  • What if the number of parallel steps is dynamic?

Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:

  1. // Get a list of N work items to process in parallel.
  2. object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);
  3. // Schedule the parallel tasks, but don't wait for them to complete yet.
  4. var parallelTasks = new List<Task<int>>(workBatch.Length);
  5. for (int i = 0; i < workBatch.Length; i++)
  6. {
  7. Task<int> task = context.CallActivityAsync<int>("ProcessWorkItem", workBatch[i]);
  8. parallelTasks.Add(task);
  9. }
  10. // Everything is scheduled. Wait here until all parallel tasks have completed.
  11. await Task.WhenAll(parallelTasks);
  12. // Aggregate all N outputs and publish the result.
  13. int sum = parallelTasks.Sum(t => t.Result);
  14. await context.CallActivityAsync("PostResults", sum);

The key takeaways from this example are:

  • The fan-out/fan-in pattern can be expressed as a simple function using ordinary programming constructs
  • The number of parallel tasks can be static or dynamic
  • The workflow itself is capable of aggregating the results of parallel executions

While not shown in the example, it’s possible to go further and limit the degree of concurrency using simple, language-specific constructs. Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.

Async HTTP APIs

Asynchronous HTTP APIs are typically implemented using the Asynchronous Request-Reply pattern. Implementing this pattern traditionally involves the following:

  1. A client sends a request to an HTTP API endpoint (the start API)
  2. The start API writes a message to a backend queue, which triggers the start of a long-running operation
  3. Immediately after scheduling the backend operation, the start API returns an HTTP 202 response to the client with an identifier that can be used to poll for status
  4. The status API queries a database that contains the status of the long-running operation
  5. The client repeatedly polls the status API either until some timeout expires or it receives a “completion” response

The end-to-end flow is illustrated in the following diagram.

Diagram showing how the async request response pattern works

The challenge with implementing the asynchronous request-reply pattern is that it involves the use of multiple APIs and state stores. It also involves implementing the protocol correctly so that the client knows how to automatically poll for status and know when the operation is complete.

The Dapr workflow HTTP API supports the asynchronous request-reply pattern out-of-the box, without requiring you to write any code or do any state management.

The following curl commands illustrate how the workflow APIs support this pattern.

  1. curl -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/12345678/start -d '{"input":{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}}'

The previous command will result in the following response JSON:

  1. {"instance_id":"12345678"}

The HTTP client can then construct the status query URL using the workflow instance ID and poll it repeatedly until it sees the “COMPLETE”, “FAILURE”, or “TERMINATED” status in the payload.

  1. curl http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/12345678

The following is an example of what an in-progress workflow status might look like.

  1. {
  2. "WFInfo": {
  3. "instance_id": "12345678"
  4. },
  5. "start_time": "2023-02-05T00:32:05Z",
  6. "metadata": {
  7. "dapr.workflow.custom_status": "",
  8. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
  9. "dapr.workflow.last_updated": "2023-02-05T00:32:18Z",
  10. "dapr.workflow.name": "OrderProcessingWorkflow",
  11. "dapr.workflow.runtime_status": "RUNNING"
  12. }
  13. }

As you can see from the previous example, the workflow’s runtime status is RUNNING, which lets the client know that it should continue polling.

If the workflow has completed, the status might look as follows.

  1. {
  2. "WFInfo": {
  3. "instance_id": "12345678"
  4. },
  5. "start_time": "2023-02-05T00:32:05Z",
  6. "metadata": {
  7. "dapr.workflow.custom_status": "",
  8. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
  9. "dapr.workflow.last_updated": "2023-02-05T00:32:23Z",
  10. "dapr.workflow.name": "OrderProcessingWorkflow",
  11. "dapr.workflow.output": "{\"Processed\":true}",
  12. "dapr.workflow.runtime_status": "COMPLETED"
  13. }
  14. }

As you can see from the previous example, the runtime status of the workflow is now COMPLETED, which means the client can stop polling for updates.

Monitor

The monitor pattern is recurring process that typically:

  1. Checks the status of a system
  2. Takes some action based on that status - e.g. send a notification
  3. Sleeps for some period of time
  4. Repeat

The following diagram provides a rough illustration of this pattern.

Diagram showing how the monitor pattern works

Depending on the business needs, there may be a single monitor or there may be multiple monitors, one for each business entity (for example, a stock). Furthermore, the amount of time to sleep may need to change, depending on the circumstances. These requirements make using cron-based scheduling systems impractical.

Dapr Workflow supports this pattern natively by allowing you to implement eternal workflows. Rather than writing infinite while-loops (which is an anti-pattern), Dapr Workflow exposes a continue-as-new API that workflow authors can use to restart a workflow function from the beginning with a new input.

  1. public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
  2. {
  3. TimeSpan nextSleepInterval;
  4. var status = await context.CallActivityAsync<string>("GetStatus");
  5. if (status == "healthy")
  6. {
  7. myEntityState.IsHealthy = true;
  8. // Check less frequently when in a healthy state
  9. nextSleepInterval = TimeSpan.FromMinutes(60);
  10. }
  11. else
  12. {
  13. if (myEntityState.IsHealthy)
  14. {
  15. myEntityState.IsHealthy = false;
  16. await context.CallActivityAsync("SendAlert", myEntityState);
  17. }
  18. // Check more frequently when in an unhealthy state
  19. nextSleepInterval = TimeSpan.FromMinutes(5);
  20. }
  21. // Put the workflow to sleep until the determined time
  22. await context.CreateTimer(nextSleepInterval);
  23. // Restart from the beginning with the updated state
  24. context.ContinueAsNew(myEntityState);
  25. return null;
  26. }

This example assumes you have a predefined MyEntityState class with a boolean IsHealthy property.

A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling continue-as-new.

Note

This pattern can also be expressed using actors and reminders. The difference is that this workflow is expressed as a single function with inputs and state stored in local variables. Workflows can also execute a sequence of actions with stronger reliability guarantees, if necessary.

Next steps

Workflow architecture >>

Last modified February 8, 2023: Update workflow-patterns.md (4f0b0fa2)