How to: Author a workflow
Learn how to develop and author workflows
Note
Dapr Workflow is currently in beta. See known limitations for 1.12.0.
This article provides a high-level overview of how to author workflows that are executed by the Dapr Workflow engine.
Note
If you haven’t already, try out the workflow quickstart for a quick walk-through on how to use workflows.
Author workflows as code
Dapr Workflow logic is implemented using general purpose programming languages, allowing you to:
- Use your preferred programming language (no need to learn a new DSL or YAML schema).
- Have access to the language’s standard libraries.
- Build your own libraries and abstractions.
- Use debuggers and examine local variables.
- Write unit tests for your workflows, just like any other part of your application logic.
The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar simply drives the execution of the workflows, leaving all the workflow activities to be part of the application.
Write the workflow activities
Workflow activities are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.
Define the workflow activities you’d like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called hello_act
that notifies users of the current counter value. hello_act
is a function derived from a class called WorkflowActivityContext
.
def hello_act(ctx: WorkflowActivityContext, input):
global counter
counter += input
print(f'New counter value is: {counter}!', flush=True)
See the hello_act workflow activity in context.
Define the workflow activities you’d like your workflow to perform. Activities are a class definition and can take inputs and outputs. Activities also participate in dependency injection, like binding to a Dapr client.
The activities called in the example below are:
NotifyActivity
: Receive notification of a new order.ReserveInventoryActivity
: Check for sufficient inventory to meet the new order.ProcessPaymentActivity
: Process payment for the order. IncludesNotifyActivity
to send notification of successful order.
NotifyActivity
public class NotifyActivity : WorkflowActivity<Notification, object>
{
//...
public NotifyActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<NotifyActivity>();
}
//...
}
See the full NotifyActivity.cs workflow activity example.
ReserveInventoryActivity
public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
//...
public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
this.client = client;
}
//...
}
See the full ReserveInventoryActivity.cs workflow activity example.
ProcessPaymentActivity
public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
//...
public ProcessPaymentActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
}
//...
}
See the full ProcessPaymentActivity.cs workflow activity example.
Define the workflow activities you’d like your workflow to perform. Activities are wrapped in the public DemoWorkflowActivity
class, which implements the workflow activities.
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class DemoWorkflowActivity implements WorkflowActivity {
@Override
public DemoActivityOutput run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
logger.info("Starting Activity: " + ctx.getName());
var message = ctx.getInput(DemoActivityInput.class).getMessage();
var newMessage = message + " World!, from Activity";
logger.info("Message Received from input: " + message);
logger.info("Sending message to output: " + newMessage);
logger.info("Sleeping for 5 seconds to simulate long running operation...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Activity finished");
var output = new DemoActivityOutput(message, newMessage);
logger.info("Activity returned: " + output);
return output;
}
}
See the Java SDK workflow activity example in context.
Write the workflow
Next, register and call the activites in a workflow.
The hello_world_wf
function is derived from a class called DaprWorkflowContext
with input and output parameter types. It also includes a yield
statement that does the heavy lifting of the workflow and calls the workflow activities.
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
See the hello_world_wf workflow in context.
The OrderProcessingWorkflow
class is derived from a base class called Workflow
with input and output parameter types. It also includes a RunAsync
method that does the heavy lifting of the workflow and calls the workflow activities.
class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
//...
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
//...
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
//...
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} processed successfully!"));
// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
See the full workflow example in OrderProcessingWorkflow.cs.
Next, register the workflow with the WorkflowRuntimeBuilder
and start the workflow runtime.
public class DemoWorkflowWorker {
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
builder.registerActivity(DemoWorkflowActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
System.exit(0);
}
}
See the Java SDK workflow in context.
Write the application
Finally, compose the application using the workflow.
In the following example, for a basic Python hello world application using the Python SDK, your project code would include:
- A Python package called
DaprClient
to receive the Python SDK capabilities. - A builder with extensions called:
WorkflowRuntime
: Allows you to register workflows and workflow activitiesDaprWorkflowContext
: Allows you to create workflowsWorkflowActivityContext
: Allows you to create workflow activities
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient
# ...
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
# Start workflow
print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# ...
# Pause workflow
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
# Resume workflow
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
sleep(1)
# Raise workflow
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
sleep(5)
# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
# Kick off another workflow for termination purposes
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# Terminate workflow
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
workflowRuntime.shutdown()
if __name__ == '__main__':
main()
In the following Program.cs example, for a basic ASP.NET order processing application using the .NET SDK, your project code would include:
- A NuGet package called
Dapr.Workflow
to receive the .NET SDK capabilities - A builder with an extension method called
AddDaprWorkflow
- This will allow you to register workflows and workflow activities (tasks that workflows can schedule)
- HTTP API calls
- One for submitting a new order
- One for checking the status of an existing order
using Dapr.Workflow;
//...
// Dapr Workflows are registered as part of the service configuration
builder.Services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
});
WebApplication app = builder.Build();
// POST starts new order workflow instance
app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
{
if (orderInfo?.Name == null)
{
return Results.BadRequest(new
{
message = "Order data was missing from the request",
example = new OrderPayload("Paperclips", 99.95),
});
}
//...
});
// GET fetches state for order workflow to report status
app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
{
WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
if (!state.Exists)
{
return Results.NotFound($"No order with ID = '{orderId}' was found.");
}
var httpResponsePayload = new
{
details = state.ReadInputAs<OrderPayload>(),
status = state.RuntimeStatus.ToString(),
result = state.ReadOutputAs<OrderResult>(),
};
//...
}).WithName("GetOrderInfoEndpoint");
app.Run();
As in the following example, a hello-world application using the Java SDK and Dapr Workflow would include:
- A Java package called
io.dapr.workflows.client
to receive the Java SDK client capabilities. - An import of
io.dapr.workflows.Workflow
- The
DemoWorkflow
class which extendsWorkflow
- Creating the workflow with input and output.
- API calls. In the example below, these calls start and call the workflow activities.
package io.dapr.examples.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/**
* Implementation of the DemoWorkflow for the server side.
*/
public class DemoWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
// ...
ctx.getLogger().info("Calling Activity...");
var input = new DemoActivityInput("Hello Activity!");
var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
// ...
};
}
}
See the full Java SDK workflow example in context.
Important
Because of how replay-based workflows execute, you’ll write logic that does things like I/O and interacting with systems inside activities. Meanwhile, the workflow method is just for orchestrating those activities.
Next steps
Now that you’ve authored a workflow, learn how to manage it.
Related links
- Workflow overview
- Workflow API reference
- Try out the full SDK examples:
Last modified October 12, 2023: Update config.toml (#3826) (0ffc2e7)