Workflow patterns

Write different types of workflow patterns

Dapr Workflows simplify complex, stateful coordination requirements in microservice architectures. The following sections describe several application patterns that can benefit from Dapr Workflows.

Task chaining

In the task chaining pattern, multiple steps in a workflow are run in succession, and the output of one step may be passed as the input to the next step. Task chaining workflows typically involve creating a sequence of operations that need to be performed on some data, such as filtering, transforming, and reducing.

Diagram showing how the task chaining workflow pattern works

In some cases, the steps of the workflow may need to be orchestrated across multiple microservices. For increased reliability and scalability, you’re also likely to use queues to trigger the various steps.

While the pattern is simple, there are many complexities hidden in the implementation. For example:

  • What happens if one of the microservices are unavailable for an extended period of time?
  • Can failed steps be automatically retried?
  • If not, how do you facilitate the rollback of previously completed steps, if applicable?
  • Implementation details aside, is there a way to visualize the workflow so that other engineers can understand what it does and how it works?

Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.

  1. import dapr.ext.workflow as wf
  2. def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
  3. try:
  4. result1 = yield ctx.call_activity(step1, input=wf_input)
  5. result2 = yield ctx.call_activity(step2, input=result1)
  6. result3 = yield ctx.call_activity(step3, input=result2)
  7. except Exception as e:
  8. yield ctx.call_activity(error_handler, input=str(e))
  9. raise
  10. return [result1, result2, result3]
  11. def step1(ctx, activity_input):
  12. print(f'Step 1: Received input: {activity_input}.')
  13. # Do some work
  14. return activity_input + 1
  15. def step2(ctx, activity_input):
  16. print(f'Step 2: Received input: {activity_input}.')
  17. # Do some work
  18. return activity_input * 2
  19. def step3(ctx, activity_input):
  20. print(f'Step 3: Received input: {activity_input}.')
  21. # Do some work
  22. return activity_input ^ 2
  23. def error_handler(ctx, error):
  24. print(f'Executing error handler: {error}.')
  25. # Do some compensating work

Note Workflow retry policies will be available in a future version of the Python SDK.

  1. import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";
  2. async function start() {
  3. // Update the gRPC client and worker to use a local address and port
  4. const daprHost = "localhost";
  5. const daprPort = "50001";
  6. const workflowClient = new DaprWorkflowClient({
  7. daprHost,
  8. daprPort,
  9. });
  10. const workflowRuntime = new WorkflowRuntime({
  11. daprHost,
  12. daprPort,
  13. });
  14. const hello = async (_: WorkflowActivityContext, name: string) => {
  15. return `Hello ${name}!`;
  16. };
  17. const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
  18. const cities: string[] = [];
  19. const result1 = yield ctx.callActivity(hello, "Tokyo");
  20. cities.push(result1);
  21. const result2 = yield ctx.callActivity(hello, "Seattle");
  22. cities.push(result2);
  23. const result3 = yield ctx.callActivity(hello, "London");
  24. cities.push(result3);
  25. return cities;
  26. };
  27. workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
  28. // Wrap the worker startup in a try-catch block to handle any errors during startup
  29. try {
  30. await workflowRuntime.start();
  31. console.log("Workflow runtime started successfully");
  32. } catch (error) {
  33. console.error("Error starting workflow runtime:", error);
  34. }
  35. // Schedule a new orchestration
  36. try {
  37. const id = await workflowClient.scheduleNewWorkflow(sequence);
  38. console.log(`Orchestration scheduled with ID: ${id}`);
  39. // Wait for orchestration completion
  40. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  41. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  42. } catch (error) {
  43. console.error("Error scheduling or waiting for orchestration:", error);
  44. }
  45. await workflowRuntime.stop();
  46. await workflowClient.stop();
  47. // stop the dapr side car
  48. process.exit(0);
  49. }
  50. start().catch((e) => {
  51. console.error(e);
  52. process.exit(1);
  53. });
  1. // Expotential backoff retry policy that survives long outages
  2. var retryOptions = new WorkflowTaskOptions
  3. {
  4. RetryPolicy = new WorkflowRetryPolicy(
  5. firstRetryInterval: TimeSpan.FromMinutes(1),
  6. backoffCoefficient: 2.0,
  7. maxRetryInterval: TimeSpan.FromHours(1),
  8. maxNumberOfAttempts: 10),
  9. };
  10. try
  11. {
  12. var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions);
  13. var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions);
  14. var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions);
  15. return string.Join(", ", result4);
  16. }
  17. catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
  18. {
  19. // Retries expired - apply custom compensation logic
  20. await context.CallActivityAsync<long[]>("MyCompensation", options: retryOptions);
  21. throw;
  22. }

Note In the example above, "Step1", "Step2", "Step3", and "MyCompensation" represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.

  1. public class ChainWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. StringBuilder sb = new StringBuilder();
  6. String wfInput = ctx.getInput(String.class);
  7. String result1 = ctx.callActivity("Step1", wfInput, String.class).await();
  8. String result2 = ctx.callActivity("Step2", result1, String.class).await();
  9. String result3 = ctx.callActivity("Step3", result2, String.class).await();
  10. String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString();
  11. ctx.complete(result);
  12. };
  13. }
  14. }
  15. class Step1 implements WorkflowActivity {
  16. @Override
  17. public Object run(WorkflowActivityContext ctx) {
  18. Logger logger = LoggerFactory.getLogger(Step1.class);
  19. logger.info("Starting Activity: " + ctx.getName());
  20. // Do some work
  21. return null;
  22. }
  23. }
  24. class Step2 implements WorkflowActivity {
  25. @Override
  26. public Object run(WorkflowActivityContext ctx) {
  27. Logger logger = LoggerFactory.getLogger(Step2.class);
  28. logger.info("Starting Activity: " + ctx.getName());
  29. // Do some work
  30. return null;
  31. }
  32. }
  33. class Step3 implements WorkflowActivity {
  34. @Override
  35. public Object run(WorkflowActivityContext ctx) {
  36. Logger logger = LoggerFactory.getLogger(Step3.class);
  37. logger.info("Starting Activity: " + ctx.getName());
  38. // Do some work
  39. return null;
  40. }
  41. }
  1. func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return "", err
  5. }
  6. var result1 int
  7. if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
  8. return nil, err
  9. }
  10. var result2 int
  11. if err := ctx.CallActivity(Step2, workflow.ActivityInput(input)).Await(&result2); err != nil {
  12. return nil, err
  13. }
  14. var result3 int
  15. if err := ctx.CallActivity(Step3, workflow.ActivityInput(input)).Await(&result3); err != nil {
  16. return nil, err
  17. }
  18. return []int{result1, result2, result3}, nil
  19. }
  20. func Step1(ctx workflow.ActivityContext) (any, error) {
  21. var input int
  22. if err := ctx.GetInput(&input); err != nil {
  23. return "", err
  24. }
  25. fmt.Printf("Step 1: Received input: %s", input)
  26. return input + 1, nil
  27. }
  28. func Step2(ctx workflow.ActivityContext) (any, error) {
  29. var input int
  30. if err := ctx.GetInput(&input); err != nil {
  31. return "", err
  32. }
  33. fmt.Printf("Step 2: Received input: %s", input)
  34. return input * 2, nil
  35. }
  36. func Step3(ctx workflow.ActivityContext) (any, error) {
  37. var input int
  38. if err := ctx.GetInput(&input); err != nil {
  39. return "", err
  40. }
  41. fmt.Printf("Step 3: Received input: %s", input)
  42. return int(math.Pow(float64(input), 2)), nil
  43. }

As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.

Behind the scenes, the Dapr Workflow runtime:

  • Takes care of executing the workflow and ensuring that it runs to completion.
  • Saves progress automatically.
  • Automatically resumes the workflow from the last completed step if the workflow process itself fails for any reason.
  • Enables error handling to be expressed naturally in your target programming language, allowing you to implement compensation logic easily.
  • Provides built-in retry configuration primitives to simplify the process of configuring complex retry policies for individual steps in the workflow.

Fan-out/fan-in

In the fan-out/fan-in design pattern, you execute multiple tasks simultaneously across potentially multiple workers, wait for them to finish, and perform some aggregation on the result.

Diagram showing how the fan-out/fan-in workflow pattern works

In addition to the challenges mentioned in the previous pattern, there are several important questions to consider when implementing the fan-out/fan-in pattern manually:

  • How do you control the degree of parallelism?
  • How do you know when to trigger subsequent aggregation steps?
  • What if the number of parallel steps is dynamic?

Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:

  1. import time
  2. from typing import List
  3. import dapr.ext.workflow as wf
  4. def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
  5. # get a batch of N work items to process in parallel
  6. work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
  7. # schedule N parallel tasks to process the work items and wait for all to complete
  8. parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
  9. outputs = yield wf.when_all(parallel_tasks)
  10. # aggregate the results and send them to another activity
  11. total = sum(outputs)
  12. yield ctx.call_activity(process_results, input=total)
  13. def get_work_batch(ctx, batch_size: int) -> List[int]:
  14. return [i + 1 for i in range(batch_size)]
  15. def process_work_item(ctx, work_item: int) -> int:
  16. print(f'Processing work item: {work_item}.')
  17. time.sleep(5)
  18. result = work_item * 2
  19. print(f'Work item {work_item} processed. Result: {result}.')
  20. return result
  21. def process_results(ctx, final_result: int):
  22. print(f'Final result: {final_result}.')
  1. import {
  2. Task,
  3. DaprWorkflowClient,
  4. WorkflowActivityContext,
  5. WorkflowContext,
  6. WorkflowRuntime,
  7. TWorkflow,
  8. } from "@dapr/dapr";
  9. // Wrap the entire code in an immediately-invoked async function
  10. async function start() {
  11. // Update the gRPC client and worker to use a local address and port
  12. const daprHost = "localhost";
  13. const daprPort = "50001";
  14. const workflowClient = new DaprWorkflowClient({
  15. daprHost,
  16. daprPort,
  17. });
  18. const workflowRuntime = new WorkflowRuntime({
  19. daprHost,
  20. daprPort,
  21. });
  22. function getRandomInt(min: number, max: number): number {
  23. return Math.floor(Math.random() * (max - min + 1)) + min;
  24. }
  25. async function getWorkItemsActivity(_: WorkflowActivityContext): Promise<string[]> {
  26. const count: number = getRandomInt(2, 10);
  27. console.log(`generating ${count} work items...`);
  28. const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`);
  29. return workItems;
  30. }
  31. function sleep(ms: number): Promise<void> {
  32. return new Promise((resolve) => setTimeout(resolve, ms));
  33. }
  34. async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise<number> {
  35. console.log(`processing work item: ${item}`);
  36. // Simulate some work that takes a variable amount of time
  37. const sleepTime = Math.random() * 5000;
  38. await sleep(sleepTime);
  39. // Return a result for the given work item, which is also a random number in this case
  40. // For more information about random numbers in workflow please check
  41. // https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
  42. return Math.floor(Math.random() * 11);
  43. }
  44. const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
  45. const tasks: Task<any>[] = [];
  46. const workItems = yield ctx.callActivity(getWorkItemsActivity);
  47. for (const workItem of workItems) {
  48. tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
  49. }
  50. const results: number[] = yield ctx.whenAll(tasks);
  51. const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
  52. return sum;
  53. };
  54. workflowRuntime.registerWorkflow(workflow);
  55. workflowRuntime.registerActivity(getWorkItemsActivity);
  56. workflowRuntime.registerActivity(processWorkItemActivity);
  57. // Wrap the worker startup in a try-catch block to handle any errors during startup
  58. try {
  59. await workflowRuntime.start();
  60. console.log("Worker started successfully");
  61. } catch (error) {
  62. console.error("Error starting worker:", error);
  63. }
  64. // Schedule a new orchestration
  65. try {
  66. const id = await workflowClient.scheduleNewWorkflow(workflow);
  67. console.log(`Orchestration scheduled with ID: ${id}`);
  68. // Wait for orchestration completion
  69. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  70. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  71. } catch (error) {
  72. console.error("Error scheduling or waiting for orchestration:", error);
  73. }
  74. // stop worker and client
  75. await workflowRuntime.stop();
  76. await workflowClient.stop();
  77. // stop the dapr side car
  78. process.exit(0);
  79. }
  80. start().catch((e) => {
  81. console.error(e);
  82. process.exit(1);
  83. });
  1. // Get a list of N work items to process in parallel.
  2. object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);
  3. // Schedule the parallel tasks, but don't wait for them to complete yet.
  4. var parallelTasks = new List<Task<int>>(workBatch.Length);
  5. for (int i = 0; i < workBatch.Length; i++)
  6. {
  7. Task<int> task = context.CallActivityAsync<int>("ProcessWorkItem", workBatch[i]);
  8. parallelTasks.Add(task);
  9. }
  10. // Everything is scheduled. Wait here until all parallel tasks have completed.
  11. await Task.WhenAll(parallelTasks);
  12. // Aggregate all N outputs and publish the result.
  13. int sum = parallelTasks.Sum(t => t.Result);
  14. await context.CallActivityAsync("PostResults", sum);
  1. public class FaninoutWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. // Get a list of N work items to process in parallel.
  6. Object[] workBatch = ctx.callActivity("GetWorkBatch", Object[].class).await();
  7. // Schedule the parallel tasks, but don't wait for them to complete yet.
  8. List<Task<Integer>> tasks = Arrays.stream(workBatch)
  9. .map(workItem -> ctx.callActivity("ProcessWorkItem", workItem, int.class))
  10. .collect(Collectors.toList());
  11. // Everything is scheduled. Wait here until all parallel tasks have completed.
  12. List<Integer> results = ctx.allOf(tasks).await();
  13. // Aggregate all N outputs and publish the result.
  14. int sum = results.stream().mapToInt(Integer::intValue).sum();
  15. ctx.complete(sum);
  16. };
  17. }
  18. }
  1. func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  2. var input int
  3. if err := ctx.GetInput(&input); err != nil {
  4. return 0, err
  5. }
  6. var workBatch []int
  7. if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
  8. return 0, err
  9. }
  10. parallelTasks := workflow.NewTaskSlice(len(workBatch))
  11. for i, workItem := range workBatch {
  12. parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
  13. }
  14. var outputs int
  15. for _, task := range parallelTasks {
  16. var output int
  17. err := task.Await(&output)
  18. if err == nil {
  19. outputs += output
  20. } else {
  21. return 0, err
  22. }
  23. }
  24. if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
  25. return 0, err
  26. }
  27. return 0, nil
  28. }
  29. func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
  30. var batchSize int
  31. if err := ctx.GetInput(&batchSize); err != nil {
  32. return 0, err
  33. }
  34. batch := make([]int, batchSize)
  35. for i := 0; i < batchSize; i++ {
  36. batch[i] = i
  37. }
  38. return batch, nil
  39. }
  40. func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
  41. var workItem int
  42. if err := ctx.GetInput(&workItem); err != nil {
  43. return 0, err
  44. }
  45. fmt.Printf("Processing work item: %d\n", workItem)
  46. time.Sleep(time.Second * 5)
  47. result := workItem * 2
  48. fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
  49. return result, nil
  50. }
  51. func ProcessResults(ctx workflow.ActivityContext) (any, error) {
  52. var finalResult int
  53. if err := ctx.GetInput(&finalResult); err != nil {
  54. return 0, err
  55. }
  56. fmt.Printf("Final result: %d\n", finalResult)
  57. return finalResult, nil
  58. }

The key takeaways from this example are:

  • The fan-out/fan-in pattern can be expressed as a simple function using ordinary programming constructs
  • The number of parallel tasks can be static or dynamic
  • The workflow itself is capable of aggregating the results of parallel executions

Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.

It’s possible to go further and limit the degree of concurrency using simple, language-specific constructs. The sample code below illustrates how to restrict the degree of fan-out to just 5 concurrent activity executions:

  1. //Revisiting the earlier example...
  2. // Get a list of N work items to process in parallel.
  3. object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);
  4. const int MaxParallelism = 5;
  5. var results = new List<int>();
  6. var inFlightTasks = new HashSet<Task<int>>();
  7. foreach(var workItem in workBatch)
  8. {
  9. if (inFlightTasks.Count >= MaxParallelism)
  10. {
  11. var finishedTask = await Task.WhenAny(inFlightTasks);
  12. results.Add(finishedTask.Result);
  13. inFlightTasks.Remove(finishedTask);
  14. }
  15. inFlightTasks.Add(context.CallActivityAsync<int>("ProcessWorkItem", workItem));
  16. }
  17. results.AddRange(await Task.WhenAll(inFlightTasks));
  18. var sum = results.Sum(t => t);
  19. await context.CallActivityAsync("PostResults", sum);

Limiting the degree of concurrency in this way can be useful for limiting contention against shared resources. For example, if the activities need to call into external resources that have their own concurrency limits, like a databases or external APIs, it can be useful to ensure that no more than a specified number of activities call that resource concurrently.

Async HTTP APIs

Asynchronous HTTP APIs are typically implemented using the Asynchronous Request-Reply pattern. Implementing this pattern traditionally involves the following:

  1. A client sends a request to an HTTP API endpoint (the start API)
  2. The start API writes a message to a backend queue, which triggers the start of a long-running operation
  3. Immediately after scheduling the backend operation, the start API returns an HTTP 202 response to the client with an identifier that can be used to poll for status
  4. The status API queries a database that contains the status of the long-running operation
  5. The client repeatedly polls the status API either until some timeout expires or it receives a “completion” response

The end-to-end flow is illustrated in the following diagram.

Diagram showing how the async request response pattern works

The challenge with implementing the asynchronous request-reply pattern is that it involves the use of multiple APIs and state stores. It also involves implementing the protocol correctly so that the client knows how to automatically poll for status and know when the operation is complete.

The Dapr workflow HTTP API supports the asynchronous request-reply pattern out-of-the box, without requiring you to write any code or do any state management.

The following curl commands illustrate how the workflow APIs support this pattern.

  1. curl -X POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}'

The previous command will result in the following response JSON:

  1. {"instanceID":"12345678"}

The HTTP client can then construct the status query URL using the workflow instance ID and poll it repeatedly until it sees the “COMPLETE”, “FAILURE”, or “TERMINATED” status in the payload.

  1. curl http://localhost:3500/v1.0-beta1/workflows/dapr/12345678

The following is an example of what an in-progress workflow status might look like.

  1. {
  2. "instanceID": "12345678",
  3. "workflowName": "OrderProcessingWorkflow",
  4. "createdAt": "2023-05-03T23:22:11.143069826Z",
  5. "lastUpdatedAt": "2023-05-03T23:22:22.460025267Z",
  6. "runtimeStatus": "RUNNING",
  7. "properties": {
  8. "dapr.workflow.custom_status": "",
  9. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}"
  10. }
  11. }

As you can see from the previous example, the workflow’s runtime status is RUNNING, which lets the client know that it should continue polling.

If the workflow has completed, the status might look as follows.

  1. {
  2. "instanceID": "12345678",
  3. "workflowName": "OrderProcessingWorkflow",
  4. "createdAt": "2023-05-03T23:30:11.381146313Z",
  5. "lastUpdatedAt": "2023-05-03T23:30:52.923870615Z",
  6. "runtimeStatus": "COMPLETED",
  7. "properties": {
  8. "dapr.workflow.custom_status": "",
  9. "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
  10. "dapr.workflow.output": "{\"Processed\":true}"
  11. }
  12. }

As you can see from the previous example, the runtime status of the workflow is now COMPLETED, which means the client can stop polling for updates.

Monitor

The monitor pattern is recurring process that typically:

  1. Checks the status of a system
  2. Takes some action based on that status - e.g. send a notification
  3. Sleeps for some period of time
  4. Repeat

The following diagram provides a rough illustration of this pattern.

Diagram showing how the monitor pattern works

Depending on the business needs, there may be a single monitor or there may be multiple monitors, one for each business entity (for example, a stock). Furthermore, the amount of time to sleep may need to change, depending on the circumstances. These requirements make using cron-based scheduling systems impractical.

Dapr Workflow supports this pattern natively by allowing you to implement eternal workflows. Rather than writing infinite while-loops (which is an anti-pattern), Dapr Workflow exposes a continue-as-new API that workflow authors can use to restart a workflow function from the beginning with a new input.

  1. from dataclasses import dataclass
  2. from datetime import timedelta
  3. import random
  4. import dapr.ext.workflow as wf
  5. @dataclass
  6. class JobStatus:
  7. job_id: str
  8. is_healthy: bool
  9. def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
  10. # poll a status endpoint associated with this job
  11. status = yield ctx.call_activity(check_status, input=job)
  12. if not ctx.is_replaying:
  13. print(f"Job '{job.job_id}' is {status}.")
  14. if status == "healthy":
  15. job.is_healthy = True
  16. next_sleep_interval = 60 # check less frequently when healthy
  17. else:
  18. if job.is_healthy:
  19. job.is_healthy = False
  20. ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!")
  21. next_sleep_interval = 5 # check more frequently when unhealthy
  22. yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(minutes=next_sleep_interval))
  23. # restart from the beginning with a new JobStatus input
  24. ctx.continue_as_new(job)
  25. def check_status(ctx, _) -> str:
  26. return random.choice(["healthy", "unhealthy"])
  27. def send_alert(ctx, message: str):
  28. print(f'*** Alert: {message}')
  1. const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any {
  2. let duration;
  3. const status = yield ctx.callActivity(checkStatusActivity);
  4. if (status === "healthy") {
  5. // Check less frequently when in a healthy state
  6. // set duration to 1 hour
  7. duration = 60 * 60;
  8. } else {
  9. yield ctx.callActivity(alertActivity, "job unhealthy");
  10. // Check more frequently when in an unhealthy state
  11. // set duration to 5 minutes
  12. duration = 5 * 60;
  13. }
  14. // Put the workflow to sleep until the determined time
  15. ctx.createTimer(duration);
  16. // Restart from the beginning with the updated state
  17. ctx.continueAsNew();
  18. };
  1. public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
  2. {
  3. TimeSpan nextSleepInterval;
  4. var status = await context.CallActivityAsync<string>("GetStatus");
  5. if (status == "healthy")
  6. {
  7. myEntityState.IsHealthy = true;
  8. // Check less frequently when in a healthy state
  9. nextSleepInterval = TimeSpan.FromMinutes(60);
  10. }
  11. else
  12. {
  13. if (myEntityState.IsHealthy)
  14. {
  15. myEntityState.IsHealthy = false;
  16. await context.CallActivityAsync("SendAlert", myEntityState);
  17. }
  18. // Check more frequently when in an unhealthy state
  19. nextSleepInterval = TimeSpan.FromMinutes(5);
  20. }
  21. // Put the workflow to sleep until the determined time
  22. await context.CreateTimer(nextSleepInterval);
  23. // Restart from the beginning with the updated state
  24. context.ContinueAsNew(myEntityState);
  25. return null;
  26. }

This example assumes you have a predefined MyEntityState class with a boolean IsHealthy property.

  1. public class MonitorWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. Duration nextSleepInterval;
  6. var status = ctx.callActivity(DemoWorkflowStatusActivity.class.getName(), DemoStatusActivityOutput.class).await();
  7. var isHealthy = status.getIsHealthy();
  8. if (isHealthy) {
  9. // Check less frequently when in a healthy state
  10. nextSleepInterval = Duration.ofMinutes(60);
  11. } else {
  12. ctx.callActivity(DemoWorkflowAlertActivity.class.getName()).await();
  13. // Check more frequently when in an unhealthy state
  14. nextSleepInterval = Duration.ofMinutes(5);
  15. }
  16. // Put the workflow to sleep until the determined time
  17. try {
  18. ctx.createTimer(nextSleepInterval);
  19. } catch (InterruptedException e) {
  20. throw new RuntimeException(e);
  21. }
  22. // Restart from the beginning with the updated state
  23. ctx.continueAsNew();
  24. }
  25. }
  26. }
  1. type JobStatus struct {
  2. JobID string `json:"job_id"`
  3. IsHealthy bool `json:"is_healthy"`
  4. }
  5. func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  6. var sleepInterval time.Duration
  7. var job JobStatus
  8. if err := ctx.GetInput(&job); err != nil {
  9. return "", err
  10. }
  11. var status string
  12. if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil {
  13. return "", err
  14. }
  15. if status == "healthy" {
  16. job.IsHealthy = true
  17. sleepInterval = time.Minutes * 60
  18. } else {
  19. if job.IsHealthy {
  20. job.IsHealthy = false
  21. err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil)
  22. if err != nil {
  23. return "", err
  24. }
  25. }
  26. sleepInterval = time.Minutes * 5
  27. }
  28. if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil {
  29. return "", err
  30. }
  31. ctx.ContinueAsNew(job, false)
  32. return "", nil
  33. }
  34. func CheckStatus(ctx workflow.ActivityContext) (any, error) {
  35. statuses := []string{"healthy", "unhealthy"}
  36. return statuses[rand.Intn(1)], nil
  37. }
  38. func SendAlert(ctx workflow.ActivityContext) (any, error) {
  39. var message string
  40. if err := ctx.GetInput(&message); err != nil {
  41. return "", err
  42. }
  43. fmt.Printf("*** Alert: %s", message)
  44. return "", nil
  45. }

A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling continue-as-new.

Note

This pattern can also be expressed using actors and reminders. The difference is that this workflow is expressed as a single function with inputs and state stored in local variables. Workflows can also execute a sequence of actions with stronger reliability guarantees, if necessary.

External system interaction

In some cases, a workflow may need to pause and wait for an external system to perform some action. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.

Another very common scenario is when a workflow needs to pause and wait for a human, for example when approving a purchase order. Dapr Workflow supports this event pattern via the external events feature.

Here’s an example workflow for a purchase order involving a human:

  1. A workflow is triggered when a purchase order is received.
  2. A rule in the workflow determines that a human needs to perform some action. For example, the purchase order cost exceeds a certain auto-approval threshold.
  3. The workflow sends a notification requesting a human action. For example, it sends an email with an approval link to a designated approver.
  4. The workflow pauses and waits for the human to either approve or reject the order by clicking on a link.
  5. If the approval isn’t received within the specified time, the workflow resumes and performs some compensation logic, such as canceling the order.

The following diagram illustrates this flow.

Diagram showing how the external system interaction pattern works with a human involved

The following example code shows how this pattern can be implemented using Dapr Workflow.

  1. from dataclasses import dataclass
  2. from datetime import timedelta
  3. import dapr.ext.workflow as wf
  4. @dataclass
  5. class Order:
  6. cost: float
  7. product: str
  8. quantity: int
  9. def __str__(self):
  10. return f'{self.product} ({self.quantity})'
  11. @dataclass
  12. class Approval:
  13. approver: str
  14. @staticmethod
  15. def from_dict(dict):
  16. return Approval(**dict)
  17. def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
  18. # Orders under $1000 are auto-approved
  19. if order.cost < 1000:
  20. return "Auto-approved"
  21. # Orders of $1000 or more require manager approval
  22. yield ctx.call_activity(send_approval_request, input=order)
  23. # Approvals must be received within 24 hours or they will be canceled.
  24. approval_event = ctx.wait_for_external_event("approval_received")
  25. timeout_event = ctx.create_timer(timedelta(hours=24))
  26. winner = yield wf.when_any([approval_event, timeout_event])
  27. if winner == timeout_event:
  28. return "Cancelled"
  29. # The order was approved
  30. yield ctx.call_activity(place_order, input=order)
  31. approval_details = Approval.from_dict(approval_event.get_result())
  32. return f"Approved by '{approval_details.approver}'"
  33. def send_approval_request(_, order: Order) -> None:
  34. print(f'*** Sending approval request for order: {order}')
  35. def place_order(_, order: Order) -> None:
  36. print(f'*** Placing order: {order}')
  1. import {
  2. Task,
  3. DaprWorkflowClient,
  4. WorkflowActivityContext,
  5. WorkflowContext,
  6. WorkflowRuntime,
  7. TWorkflow,
  8. } from "@dapr/dapr";
  9. import * as readlineSync from "readline-sync";
  10. // Wrap the entire code in an immediately-invoked async function
  11. async function start() {
  12. class Order {
  13. cost: number;
  14. product: string;
  15. quantity: number;
  16. constructor(cost: number, product: string, quantity: number) {
  17. this.cost = cost;
  18. this.product = product;
  19. this.quantity = quantity;
  20. }
  21. }
  22. function sleep(ms: number): Promise<void> {
  23. return new Promise((resolve) => setTimeout(resolve, ms));
  24. }
  25. // Update the gRPC client and worker to use a local address and port
  26. const daprHost = "localhost";
  27. const daprPort = "50001";
  28. const workflowClient = new DaprWorkflowClient({
  29. daprHost,
  30. daprPort,
  31. });
  32. const workflowRuntime = new WorkflowRuntime({
  33. daprHost,
  34. daprPort,
  35. });
  36. // Activity function that sends an approval request to the manager
  37. const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
  38. // Simulate some work that takes an amount of time
  39. await sleep(3000);
  40. console.log(`Sending approval request for order: ${order.product}`);
  41. };
  42. // Activity function that places an order
  43. const placeOrder = async (_: WorkflowActivityContext, order: Order) => {
  44. console.log(`Placing order: ${order.product}`);
  45. };
  46. // Orchestrator function that represents a purchase order workflow
  47. const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any {
  48. // Orders under $1000 are auto-approved
  49. if (order.cost < 1000) {
  50. return "Auto-approved";
  51. }
  52. // Orders of $1000 or more require manager approval
  53. yield ctx.callActivity(sendApprovalRequest, order);
  54. // Approvals must be received within 24 hours or they will be cancled.
  55. const tasks: Task<any>[] = [];
  56. const approvalEvent = ctx.waitForExternalEvent("approval_received");
  57. const timeoutEvent = ctx.createTimer(24 * 60 * 60);
  58. tasks.push(approvalEvent);
  59. tasks.push(timeoutEvent);
  60. const winner = ctx.whenAny(tasks);
  61. if (winner == timeoutEvent) {
  62. return "Cancelled";
  63. }
  64. yield ctx.callActivity(placeOrder, order);
  65. const approvalDetails = approvalEvent.getResult();
  66. return `Approved by ${approvalDetails.approver}`;
  67. };
  68. workflowRuntime
  69. .registerWorkflow(purchaseOrderWorkflow)
  70. .registerActivity(sendApprovalRequest)
  71. .registerActivity(placeOrder);
  72. // Wrap the worker startup in a try-catch block to handle any errors during startup
  73. try {
  74. await workflowRuntime.start();
  75. console.log("Worker started successfully");
  76. } catch (error) {
  77. console.error("Error starting worker:", error);
  78. }
  79. // Schedule a new orchestration
  80. try {
  81. const cost = readlineSync.questionInt("Cost of your order:");
  82. const approver = readlineSync.question("Approver of your order:");
  83. const timeout = readlineSync.questionInt("Timeout for your order in seconds:");
  84. const order = new Order(cost, "MyProduct", 1);
  85. const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
  86. console.log(`Orchestration scheduled with ID: ${id}`);
  87. // prompt for approval asynchronously
  88. promptForApproval(approver, workflowClient, id);
  89. // Wait for orchestration completion
  90. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);
  91. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  92. } catch (error) {
  93. console.error("Error scheduling or waiting for orchestration:", error);
  94. }
  95. // stop worker and client
  96. await workflowRuntime.stop();
  97. await workflowClient.stop();
  98. // stop the dapr side car
  99. process.exit(0);
  100. }
  101. async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
  102. if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
  103. const approvalEvent = { approver: approver };
  104. await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
  105. } else {
  106. return "Order rejected";
  107. }
  108. }
  109. start().catch((e) => {
  110. console.error(e);
  111. process.exit(1);
  112. });
  1. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  2. {
  3. // ...(other steps)...
  4. // Require orders over a certain threshold to be approved
  5. if (order.TotalCost > OrderApprovalThreshold)
  6. {
  7. try
  8. {
  9. // Request human approval for this order
  10. await context.CallActivityAsync(nameof(RequestApprovalActivity), order);
  11. // Pause and wait for a human to approve the order
  12. ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
  13. eventName: "ManagerApproval",
  14. timeout: TimeSpan.FromDays(3));
  15. if (approvalResult == ApprovalResult.Rejected)
  16. {
  17. // The order was rejected, end the workflow here
  18. return new OrderResult(Processed: false);
  19. }
  20. }
  21. catch (TaskCanceledException)
  22. {
  23. // An approval timeout results in automatic order cancellation
  24. return new OrderResult(Processed: false);
  25. }
  26. }
  27. // ...(other steps)...
  28. // End the workflow with a success result
  29. return new OrderResult(Processed: true);
  30. }

Note In the example above, RequestApprovalActivity is the name of a workflow activity to invoke and ApprovalResult is an enumeration defined by the workflow app. For brevity, these definitions were left out of the example code.

  1. public class ExternalSystemInteractionWorkflow extends Workflow {
  2. @Override
  3. public WorkflowStub create() {
  4. return ctx -> {
  5. // ...other steps...
  6. Integer orderCost = ctx.getInput(int.class);
  7. // Require orders over a certain threshold to be approved
  8. if (orderCost > ORDER_APPROVAL_THRESHOLD) {
  9. try {
  10. // Request human approval for this order
  11. ctx.callActivity("RequestApprovalActivity", orderCost, Void.class).await();
  12. // Pause and wait for a human to approve the order
  13. boolean approved = ctx.waitForExternalEvent("ManagerApproval", Duration.ofDays(3), boolean.class).await();
  14. if (!approved) {
  15. // The order was rejected, end the workflow here
  16. ctx.complete("Process reject");
  17. }
  18. } catch (TaskCanceledException e) {
  19. // An approval timeout results in automatic order cancellation
  20. ctx.complete("Process cancel");
  21. }
  22. }
  23. // ...other steps...
  24. // End the workflow with a success result
  25. ctx.complete("Process approved");
  26. };
  27. }
  28. }
  1. type Order struct {
  2. Cost float64 `json:"cost"`
  3. Product string `json:"product"`
  4. Quantity int `json:"quantity"`
  5. }
  6. type Approval struct {
  7. Approver string `json:"approver"`
  8. }
  9. func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) {
  10. var order Order
  11. if err := ctx.GetInput(&order); err != nil {
  12. return "", err
  13. }
  14. // Orders under $1000 are auto-approved
  15. if order.Cost < 1000 {
  16. return "Auto-approved", nil
  17. }
  18. // Orders of $1000 or more require manager approval
  19. if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil {
  20. return "", err
  21. }
  22. // Approvals must be received within 24 hours or they will be cancelled
  23. var approval Approval
  24. if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil {
  25. // Assuming that a timeout has taken place - in any case; an error.
  26. return "error/cancelled", err
  27. }
  28. // The order was approved
  29. if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil {
  30. return "", err
  31. }
  32. return fmt.Sprintf("Approved by %s", approval.Approver), nil
  33. }
  34. func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) {
  35. var order Order
  36. if err := ctx.GetInput(&order); err != nil {
  37. return "", err
  38. }
  39. fmt.Printf("*** Sending approval request for order: %v\n", order)
  40. return "", nil
  41. }
  42. func PlaceOrder(ctx workflow.ActivityContext) (any, error) {
  43. var order Order
  44. if err := ctx.GetInput(&order); err != nil {
  45. return "", err
  46. }
  47. fmt.Printf("*** Placing order: %v", order)
  48. return "", nil
  49. }

The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the raise event workflow management API, as shown in the following example:

  1. from dapr.clients import DaprClient
  2. from dataclasses import asdict
  3. with DaprClient() as d:
  4. d.raise_workflow_event(
  5. instance_id=instance_id,
  6. workflow_component="dapr",
  7. event_name="approval_received",
  8. event_data=asdict(Approval("Jane Doe")))
  1. import { DaprClient } from "@dapr/dapr";
  2. public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
  3. this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  4. }
  1. // Raise the workflow event to the waiting workflow
  2. await daprClient.RaiseWorkflowEventAsync(
  3. instanceId: orderId,
  4. workflowComponent: "dapr",
  5. eventName: "ManagerApproval",
  6. eventData: ApprovalResult.Approved);
  1. System.out.println("**SendExternalMessage: RestartEvent**");
  2. client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
  1. func raiseEvent() {
  2. daprClient, err := client.NewClient()
  3. if err != nil {
  4. log.Fatalf("failed to initialize the client")
  5. }
  6. err = daprClient.RaiseEventWorkflowBeta1(context.Background(), &client.RaiseEventWorkflowRequest{
  7. InstanceID: "instance_id",
  8. WorkflowComponent: "dapr",
  9. EventName: "approval_received",
  10. EventData: Approval{
  11. Approver: "Jane Doe",
  12. },
  13. })
  14. if err != nil {
  15. log.Fatalf("failed to raise event on workflow")
  16. }
  17. log.Println("raised an event on specified workflow")
  18. }

External events don’t have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.

Next steps

Workflow architecture >>

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)