Quickstart: Workflow

Get started with the Dapr Workflow building block

Note

Dapr Workflow is currently in beta. See known limitations for 1.12.0.

Let’s take a look at the Dapr Workflow building block. In this Quickstart, you’ll create a simple console application to demonstrate Dapr’s workflow programming model and the workflow management APIs.

In this guide, you’ll:

  • Run the order-processor application.
  • Start the workflow and watch the workflow activites/tasks execute.
  • Review the workflow logic and the workflow activities and how they’re represented in the code.

Workflow - 图1

The order-processor console app starts and manages the order_processing_workflow, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks:

  • notify_activity: Utilizes a logger to print out messages throughout the workflow. These messages notify you when:
    • You have insufficient inventory
    • Your payment couldn’t be processed, etc.
  • process_payment_activity: Processes and authorizes the payment.
  • verify_inventory_activity: Checks the state store to ensure there is enough inventory present for purchase.
  • update_inventory_activity: Removes the requested items from the state store and updates the store with the new remaining inventory value.
  • request_approval_activity: Seeks approval from the manager if payment is greater than 50,000 USD.

Step 1: Pre-requisites

For this example, you will need:

Step 2: Set up the environment

Clone the sample provided in the Quickstarts repo.

  1. git clone https://github.com/dapr/quickstarts.git

In a new terminal window, navigate to the order-processor directory:

  1. cd workflows/python/sdk/order-processor

Install the Dapr Python SDK package:

  1. pip3 install -r requirements.txt

Step 3: Run the order processor app

In the terminal, start the order processor app alongside a Dapr sidecar:

  1. dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py

Note: Since Python3.exe is not defined in Windows, you may need to use python app.py instead of python3 app.py.

This starts the order-processor app with unique workflow ID and runs the workflow activities.

Expected output:

  1. == APP == Starting order workflow, purchasing 10 of cars
  2. == APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items...
  3. == APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 !
  4. == APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars
  5. == APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase
  6. == APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars
  7. == APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval
  8. == APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved!
  9. == APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD
  10. == APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully
  11. == APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars
  12. == APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock
  13. == APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed!
  14. == APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED
  15. == APP == Workflow completed! Result: Completed
  16. == APP == Purchase of item is Completed

(Optional) Step 4: View in Zipkin

Running dapr init launches the openzipkin/zipkin Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

View the workflow trace spans in the Zipkin web UI (typically at http://localhost:9411/zipkin/).

Workflow - 图2

What happened?

When you ran dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py:

  1. A unique order ID for the workflow is generated (in the above example, f4e1926e-3721-478d-be8a-f5bebd1995da) and the workflow is scheduled.
  2. The NotifyActivity workflow activity sends a notification saying an order for 10 cars has been received.
  3. The ReserveInventoryActivity workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
  4. Your workflow starts and notifies you of its status.
  5. The ProcessPaymentActivity workflow activity begins processing payment for order f4e1926e-3721-478d-be8a-f5bebd1995da and confirms if successful.
  6. The UpdateInventoryActivity workflow activity updates the inventory with the current available cars after the order has been processed.
  7. The NotifyActivity workflow activity sends a notification saying that order f4e1926e-3721-478d-be8a-f5bebd1995da has completed.
  8. The workflow terminates as completed.

order-processor/app.py

In the application’s program file:

  • The unique workflow order ID is generated
  • The workflow is scheduled
  • The workflow status is retrieved
  • The workflow and the workflow activities it invokes are registered
  1. class WorkflowConsoleApp:
  2. def main(self):
  3. # Register workflow and activities
  4. workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
  5. workflowRuntime.register_workflow(order_processing_workflow)
  6. workflowRuntime.register_activity(notify_activity)
  7. workflowRuntime.register_activity(requst_approval_activity)
  8. workflowRuntime.register_activity(verify_inventory_activity)
  9. workflowRuntime.register_activity(process_payment_activity)
  10. workflowRuntime.register_activity(update_inventory_activity)
  11. workflowRuntime.start()
  12. print("==========Begin the purchase of item:==========", flush=True)
  13. item_name = default_item_name
  14. order_quantity = 10
  15. total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
  16. order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
  17. # Start Workflow
  18. print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
  19. start_resp = daprClient.start_workflow(workflow_component=workflow_component,
  20. workflow_name=workflow_name,
  21. input=order)
  22. _id = start_resp.instance_id
  23. def prompt_for_approval(daprClient: DaprClient):
  24. daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
  25. event_name="manager_approval", event_data={'approval': True})
  26. approval_seeked = False
  27. start_time = datetime.now()
  28. while True:
  29. time_delta = datetime.now() - start_time
  30. state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
  31. if not state:
  32. print("Workflow not found!") # not expected
  33. elif state.runtime_status == "Completed" or\
  34. state.runtime_status == "Failed" or\
  35. state.runtime_status == "Terminated":
  36. print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
  37. break
  38. if time_delta.total_seconds() >= 10:
  39. state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
  40. if total_cost > 50000 and (
  41. state.runtime_status != "Completed" or
  42. state.runtime_status != "Failed" or
  43. state.runtime_status != "Terminated"
  44. ) and not approval_seeked:
  45. approval_seeked = True
  46. threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
  47. print("Purchase of item is ", state.runtime_status, flush=True)
  48. def restock_inventory(self, daprClient: DaprClient, baseInventory):
  49. for key, item in baseInventory.items():
  50. print(f'item: {item}')
  51. item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
  52. "per_item_cost": {item.per_item_cost}}}'
  53. daprClient.save_state("statestore-actors", key, item_str)
  54. if __name__ == '__main__':
  55. app = WorkflowConsoleApp()
  56. app.main()

order-processor/workflow.py

In workflow.py, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).

  1. def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
  2. """Defines the order processing workflow.
  3. When the order is received, the inventory is checked to see if there is enough inventory to
  4. fulfill the order. If there is enough inventory, the payment is processed and the inventory is
  5. updated. If there is not enough inventory, the order is rejected.
  6. If the total order is greater than $50,000, the order is sent to a manager for approval.
  7. """
  8. order_id = ctx.instance_id
  9. order_payload=json.loads(order_payload_str)
  10. yield ctx.call_activity(notify_activity,
  11. input=Notification(message=('Received order ' +order_id+ ' for '
  12. +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}'
  13. +' at $'+f'{order_payload["total_cost"]}' +' !')))
  14. result = yield ctx.call_activity(verify_inventory_activity,
  15. input=InventoryRequest(request_id=order_id,
  16. item_name=order_payload["item_name"],
  17. quantity=order_payload["quantity"]))
  18. if not result.success:
  19. yield ctx.call_activity(notify_activity,
  20. input=Notification(message='Insufficient inventory for '
  21. +f'{order_payload["item_name"]}'+'!'))
  22. return OrderResult(processed=False)
  23. if order_payload["total_cost"] > 50000:
  24. yield ctx.call_activity(requst_approval_activity, input=order_payload)
  25. approval_task = ctx.wait_for_external_event("manager_approval")
  26. timeout_event = ctx.create_timer(timedelta(seconds=200))
  27. winner = yield when_any([approval_task, timeout_event])
  28. if winner == timeout_event:
  29. yield ctx.call_activity(notify_activity,
  30. input=Notification(message='Payment for order '+order_id
  31. +' has been cancelled due to timeout!'))
  32. return OrderResult(processed=False)
  33. approval_result = yield approval_task
  34. if approval_result["approval"]:
  35. yield ctx.call_activity(notify_activity, input=Notification(
  36. message=f'Payment for order {order_id} has been approved!'))
  37. else:
  38. yield ctx.call_activity(notify_activity, input=Notification(
  39. message=f'Payment for order {order_id} has been rejected!'))
  40. return OrderResult(processed=False)
  41. yield ctx.call_activity(process_payment_activity, input=PaymentRequest(
  42. request_id=order_id, item_being_purchased=order_payload["item_name"],
  43. amount=order_payload["total_cost"], quantity=order_payload["quantity"]))
  44. try:
  45. yield ctx.call_activity(update_inventory_activity,
  46. input=PaymentRequest(request_id=order_id,
  47. item_being_purchased=order_payload["item_name"],
  48. amount=order_payload["total_cost"],
  49. quantity=order_payload["quantity"]))
  50. except Exception:
  51. yield ctx.call_activity(notify_activity,
  52. input=Notification(message=f'Order {order_id} Failed!'))
  53. return OrderResult(processed=False)
  54. yield ctx.call_activity(notify_activity, input=Notification(
  55. message=f'Order {order_id} has completed!'))
  56. return OrderResult(processed=True)

The order-processor console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks:

  • NotifyActivity: Utilizes a logger to print out messages throughout the workflow
  • ReserveInventoryActivity: Checks the state store to ensure that there is enough inventory for the purchase
  • ProcessPaymentActivity: Processes and authorizes the payment
  • UpdateInventoryActivity: Removes the requested items from the state store and updates the store with the new remaining inventory value

Step 1: Pre-requisites

For this example, you will need:

Step 2: Set up the environment

Clone the sample provided in the Quickstarts repo.

  1. git clone https://github.com/dapr/quickstarts.git

In a new terminal window, navigate to the order-processor directory:

  1. cd workflows/csharp/sdk/order-processor

Step 3: Run the order processor app

In the terminal, start the order processor app alongside a Dapr sidecar:

  1. dapr run --app-id order-processor dotnet run

This starts the order-processor app with unique workflow ID and runs the workflow activities.

Expected output:

  1. == APP == Starting workflow 6d2abcc9 purchasing 10 Cars
  2. == APP == info: Microsoft.DurableTask.Client.Grpc.GrpcDurableTaskClient[40]
  3. == APP == Scheduling new OrderProcessingWorkflow orchestration with instance ID '6d2abcc9' and 47 bytes of input data.
  4. == APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
  5. == APP == Received order 6d2abcc9 for 10 Cars at $15000
  6. == APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
  7. == APP == Reserving inventory for order 6d2abcc9 of 10 Cars
  8. == APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
  9. == APP == There are: 100, Cars available for purchase
  10. == APP == Your workflow has started. Here is the status of the workflow: Dapr.Workflow.WorkflowState
  11. == APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
  12. == APP == Processing payment: 6d2abcc9 for 10 Cars at $15000
  13. == APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
  14. == APP == Payment for request ID '6d2abcc9' processed successfully
  15. == APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
  16. == APP == Checking Inventory for: Order# 6d2abcc9 for 10 Cars
  17. == APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
  18. == APP == There are now: 90 Cars left in stock
  19. == APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
  20. == APP == Order 6d2abcc9 has completed!
  21. == APP == Workflow Status: Completed

(Optional) Step 4: View in Zipkin

Running dapr init launches the openzipkin/zipkin Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

View the workflow trace spans in the Zipkin web UI (typically at http://localhost:9411/zipkin/).

Workflow - 图3

What happened?

When you ran dapr run --app-id order-processor dotnet run:

  1. A unique order ID for the workflow is generated (in the above example, 6d2abcc9) and the workflow is scheduled.
  2. The NotifyActivity workflow activity sends a notification saying an order for 10 cars has been received.
  3. The ReserveInventoryActivity workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
  4. Your workflow starts and notifies you of its status.
  5. The ProcessPaymentActivity workflow activity begins processing payment for order 6d2abcc9 and confirms if successful.
  6. The UpdateInventoryActivity workflow activity updates the inventory with the current available cars after the order has been processed.
  7. The NotifyActivity workflow activity sends a notification saying that order 6d2abcc9 has completed.
  8. The workflow terminates as completed.

order-processor/Program.cs

In the application’s program file:

  • The unique workflow order ID is generated
  • The workflow is scheduled
  • The workflow status is retrieved
  • The workflow and the workflow activities it invokes are registered
  1. using Dapr.Client;
  2. using Dapr.Workflow;
  3. //...
  4. {
  5. services.AddDaprWorkflow(options =>
  6. {
  7. // Note that it's also possible to register a lambda function as the workflow
  8. // or activity implementation instead of a class.
  9. options.RegisterWorkflow<OrderProcessingWorkflow>();
  10. // These are the activities that get invoked by the workflow(s).
  11. options.RegisterActivity<NotifyActivity>();
  12. options.RegisterActivity<ReserveInventoryActivity>();
  13. options.RegisterActivity<ProcessPaymentActivity>();
  14. options.RegisterActivity<UpdateInventoryActivity>();
  15. });
  16. };
  17. //...
  18. // Generate a unique ID for the workflow
  19. string orderId = Guid.NewGuid().ToString()[..8];
  20. string itemToPurchase = "Cars";
  21. int ammountToPurchase = 10;
  22. // Construct the order
  23. OrderPayload orderInfo = new OrderPayload(itemToPurchase, 15000, ammountToPurchase);
  24. // Start the workflow
  25. Console.WriteLine("Starting workflow {0} purchasing {1} {2}", orderId, ammountToPurchase, itemToPurchase);
  26. await daprClient.StartWorkflowAsync(
  27. workflowComponent: DaprWorkflowComponent,
  28. workflowName: nameof(OrderProcessingWorkflow),
  29. input: orderInfo,
  30. instanceId: orderId);
  31. // Wait for the workflow to start and confirm the input
  32. GetWorkflowResponse state = await daprClient.WaitForWorkflowStartAsync(
  33. instanceId: orderId,
  34. workflowComponent: DaprWorkflowComponent);
  35. Console.WriteLine("Your workflow has started. Here is the status of the workflow: {0}", state.RuntimeStatus);
  36. // Wait for the workflow to complete
  37. state = await daprClient.WaitForWorkflowCompletionAsync(
  38. instanceId: orderId,
  39. workflowComponent: DaprWorkflowComponent);
  40. Console.WriteLine("Workflow Status: {0}", state.RuntimeStatus);

order-processor/Workflows/OrderProcessingWorkflow.cs

In OrderProcessingWorkflow.cs, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).

  1. using Dapr.Workflow;
  2. //...
  3. class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
  4. {
  5. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  6. {
  7. string orderId = context.InstanceId;
  8. // Notify the user that an order has come through
  9. await context.CallActivityAsync(
  10. nameof(NotifyActivity),
  11. new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}"));
  12. string requestId = context.InstanceId;
  13. // Determine if there is enough of the item available for purchase by checking the inventory
  14. InventoryResult result = await context.CallActivityAsync<InventoryResult>(
  15. nameof(ReserveInventoryActivity),
  16. new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
  17. // If there is insufficient inventory, fail and let the user know
  18. if (!result.Success)
  19. {
  20. // End the workflow here since we don't have sufficient inventory
  21. await context.CallActivityAsync(
  22. nameof(NotifyActivity),
  23. new Notification($"Insufficient inventory for {order.Name}"));
  24. return new OrderResult(Processed: false);
  25. }
  26. // There is enough inventory available so the user can purchase the item(s). Process their payment
  27. await context.CallActivityAsync(
  28. nameof(ProcessPaymentActivity),
  29. new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
  30. try
  31. {
  32. // There is enough inventory available so the user can purchase the item(s). Process their payment
  33. await context.CallActivityAsync(
  34. nameof(UpdateInventoryActivity),
  35. new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
  36. }
  37. catch (TaskFailedException)
  38. {
  39. // Let them know their payment was processed
  40. await context.CallActivityAsync(
  41. nameof(NotifyActivity),
  42. new Notification($"Order {orderId} Failed! You are now getting a refund"));
  43. return new OrderResult(Processed: false);
  44. }
  45. // Let them know their payment was processed
  46. await context.CallActivityAsync(
  47. nameof(NotifyActivity),
  48. new Notification($"Order {orderId} has completed!"));
  49. // End the workflow with a success result
  50. return new OrderResult(Processed: true);
  51. }
  52. }

order-processor/Activities directory

The Activities directory holds the four workflow activities used by the workflow, defined in the following files:

  • NotifyActivity.cs
  • ReserveInventoryActivity.cs
  • ProcessPaymentActivity.cs
  • UpdateInventoryActivity.cs

Watch the demo

Watch this video to walk through the Dapr Workflow .NET demo:

The order-processor console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks:

  • NotifyActivity: Utilizes a logger to print out messages throughout the workflow
  • RequestApprovalActivity: Requests approval for processing payment
  • ReserveInventoryActivity: Checks the state store to ensure that there is enough inventory for the purchase
  • ProcessPaymentActivity: Processes and authorizes the payment
  • UpdateInventoryActivity: Removes the requested items from the state store and updates the store with the new remaining inventory value

Step 1: Pre-requisites

For this example, you will need:

Step 2: Set up the environment

Clone the sample provided in the Quickstarts repo.

  1. git clone https://github.com/dapr/quickstarts.git

Navigate to the order-processor directory:

  1. cd workflows/java/sdk/order-processor

Install the dependencies:

  1. mvn clean install

Step 3: Run the order processor app

In the terminal, start the order processor app alongside a Dapr sidecar:

  1. dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp

This starts the order-processor app with unique workflow ID and runs the workflow activities.

Expected output:

  1. == APP == *** Welcome to the Dapr Workflow console app sample!
  2. == APP == *** Using this app, you can place orders that start workflows.
  3. == APP == Start workflow runtime
  4. == APP == Sep 20, 2023 3:23:05 PM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
  5. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
  6. == APP == ==========Begin the purchase of item:==========
  7. == APP == Starting order workflow, purchasing 10 of cars
  8. == APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: edceba90-9c45-4be8-ad40-60d16e060797
  9. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow
  10. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): edceba90-9c45-4be8-ad40-60d16e060797
  11. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T19:23:09.755Z
  12. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  13. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  14. == APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 started
  15. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  16. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase
  17. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  18. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  19. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  20. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: edceba90-9c45-4be8-ad40-60d16e060797 for 10 cars at $150000
  21. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'edceba90-9c45-4be8-ad40-60d16e060797' processed successfully
  22. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  23. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797': there are now 90 cars left in stock
  24. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : edceba90-9c45-4be8-ad40-60d16e060797
  25. == APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 completed, out is: {"processed":true}

(Optional) Step 4: View in Zipkin

Running dapr init launches the openzipkin/zipkin Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

View the workflow trace spans in the Zipkin web UI (typically at http://localhost:9411/zipkin/).

Workflow - 图4

What happened?

When you ran dapr run:

  1. A unique order ID for the workflow is generated (in the above example, edceba90-9c45-4be8-ad40-60d16e060797) and the workflow is scheduled.
  2. The NotifyActivity workflow activity sends a notification saying an order for 10 cars has been received.
  3. The ReserveInventoryActivity workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
  4. Once approved, your workflow starts and notifies you of its status.
  5. The ProcessPaymentActivity workflow activity begins processing payment for order edceba90-9c45-4be8-ad40-60d16e060797 and confirms if successful.
  6. The UpdateInventoryActivity workflow activity updates the inventory with the current available cars after the order has been processed.
  7. The NotifyActivity workflow activity sends a notification saying that order edceba90-9c45-4be8-ad40-60d16e060797 has completed.
  8. The workflow terminates as completed.

order-processor/WorkflowConsoleApp.java

In the application’s program file:

  • The unique workflow order ID is generated
  • The workflow is scheduled
  • The workflow status is retrieved
  • The workflow and the workflow activities it invokes are registered
  1. package io.dapr.quickstarts.workflows;
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.workflows.client.DaprWorkflowClient;
  5. public class WorkflowConsoleApp {
  6. private static final String STATE_STORE_NAME = "statestore-actors";
  7. // ...
  8. public static void main(String[] args) throws Exception {
  9. System.out.println("*** Welcome to the Dapr Workflow console app sample!");
  10. System.out.println("*** Using this app, you can place orders that start workflows.");
  11. // Wait for the sidecar to become available
  12. Thread.sleep(5 * 1000);
  13. // Register the OrderProcessingWorkflow and its activities with the builder.
  14. WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
  15. builder.registerActivity(NotifyActivity.class);
  16. builder.registerActivity(ProcessPaymentActivity.class);
  17. builder.registerActivity(RequestApprovalActivity.class);
  18. builder.registerActivity(ReserveInventoryActivity.class);
  19. builder.registerActivity(UpdateInventoryActivity.class);
  20. // Build the workflow runtime
  21. try (WorkflowRuntime runtime = builder.build()) {
  22. System.out.println("Start workflow runtime");
  23. runtime.start(false);
  24. }
  25. InventoryItem inventory = prepareInventoryAndOrder();
  26. DaprWorkflowClient workflowClient = new DaprWorkflowClient();
  27. try (workflowClient) {
  28. executeWorkflow(workflowClient, inventory);
  29. }
  30. }
  31. // Start the workflow runtime, pulling and executing tasks
  32. private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) {
  33. System.out.println("==========Begin the purchase of item:==========");
  34. String itemName = inventory.getName();
  35. int orderQuantity = inventory.getQuantity();
  36. int totalcost = orderQuantity * inventory.getPerItemCost();
  37. OrderPayload order = new OrderPayload();
  38. order.setItemName(itemName);
  39. order.setQuantity(orderQuantity);
  40. order.setTotalCost(totalcost);
  41. System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName);
  42. String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order);
  43. System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n",
  44. instanceId);
  45. // Check workflow instance start status
  46. try {
  47. workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
  48. System.out.printf("workflow instance %s started%n", instanceId);
  49. } catch (TimeoutException e) {
  50. System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId);
  51. return;
  52. }
  53. // Check workflow instance complete status
  54. try {
  55. WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId,
  56. Duration.ofSeconds(30),
  57. true);
  58. if (workflowStatus != null) {
  59. System.out.printf("workflow instance %s completed, out is: %s %n", instanceId,
  60. workflowStatus.getSerializedOutput());
  61. } else {
  62. System.out.printf("workflow instance %s not found%n", instanceId);
  63. }
  64. } catch (TimeoutException e) {
  65. System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId);
  66. }
  67. }
  68. private static InventoryItem prepareInventoryAndOrder() {
  69. // prepare 100 cars in inventory
  70. InventoryItem inventory = new InventoryItem();
  71. inventory.setName("cars");
  72. inventory.setPerItemCost(15000);
  73. inventory.setQuantity(100);
  74. DaprClient daprClient = new DaprClientBuilder().build();
  75. restockInventory(daprClient, inventory);
  76. // prepare order for 10 cars
  77. InventoryItem order = new InventoryItem();
  78. order.setName("cars");
  79. order.setPerItemCost(15000);
  80. order.setQuantity(10);
  81. return order;
  82. }
  83. private static void restockInventory(DaprClient daprClient, InventoryItem inventory) {
  84. String key = inventory.getName();
  85. daprClient.saveState(STATE_STORE_NAME, key, inventory).block();
  86. }
  87. }

OrderProcessingWorkflow.java

In OrderProcessingWorkflow.java, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).

  1. package io.dapr.quickstarts.workflows;
  2. import io.dapr.workflows.Workflow;
  3. public class OrderProcessingWorkflow extends Workflow {
  4. @Override
  5. public WorkflowStub create() {
  6. return ctx -> {
  7. Logger logger = ctx.getLogger();
  8. String orderId = ctx.getInstanceId();
  9. logger.info("Starting Workflow: " + ctx.getName());
  10. logger.info("Instance ID(order ID): " + orderId);
  11. logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());
  12. OrderPayload order = ctx.getInput(OrderPayload.class);
  13. logger.info("Received Order: " + order.toString());
  14. OrderResult orderResult = new OrderResult();
  15. orderResult.setProcessed(false);
  16. // Notify the user that an order has come through
  17. Notification notification = new Notification();
  18. notification.setMessage("Received Order: " + order.toString());
  19. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  20. // Determine if there is enough of the item available for purchase by checking
  21. // the inventory
  22. InventoryRequest inventoryRequest = new InventoryRequest();
  23. inventoryRequest.setRequestId(orderId);
  24. inventoryRequest.setItemName(order.getItemName());
  25. inventoryRequest.setQuantity(order.getQuantity());
  26. InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(),
  27. inventoryRequest, InventoryResult.class).await();
  28. // If there is insufficient inventory, fail and let the user know
  29. if (!inventoryResult.isSuccess()) {
  30. notification.setMessage("Insufficient inventory for order : " + order.getItemName());
  31. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  32. ctx.complete(orderResult);
  33. return;
  34. }
  35. // Require orders over a certain threshold to be approved
  36. if (order.getTotalCost() > 5000) {
  37. ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(),
  38. order, ApprovalResult.class).await();
  39. if (approvalResult != ApprovalResult.Approved) {
  40. notification.setMessage("Order " + order.getItemName() + " was not approved.");
  41. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  42. ctx.complete(orderResult);
  43. return;
  44. }
  45. }
  46. // There is enough inventory available so the user can purchase the item(s).
  47. // Process their payment
  48. PaymentRequest paymentRequest = new PaymentRequest();
  49. paymentRequest.setRequestId(orderId);
  50. paymentRequest.setItemBeingPurchased(order.getItemName());
  51. paymentRequest.setQuantity(order.getQuantity());
  52. paymentRequest.setAmount(order.getTotalCost());
  53. boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(),
  54. paymentRequest, boolean.class).await();
  55. if (!isOK) {
  56. notification.setMessage("Payment failed for order : " + orderId);
  57. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  58. ctx.complete(orderResult);
  59. return;
  60. }
  61. inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(),
  62. inventoryRequest, InventoryResult.class).await();
  63. if (!inventoryResult.isSuccess()) {
  64. // If there is an error updating the inventory, refund the user
  65. // paymentRequest.setAmount(-1 * paymentRequest.getAmount());
  66. // ctx.callActivity(ProcessPaymentActivity.class.getName(),
  67. // paymentRequest).await();
  68. // Let users know their payment processing failed
  69. notification.setMessage("Order failed to update inventory! : " + orderId);
  70. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  71. ctx.complete(orderResult);
  72. return;
  73. }
  74. // Let user know their order was processed
  75. notification.setMessage("Order completed! : " + orderId);
  76. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  77. // Complete the workflow with order result is processed
  78. orderResult.setProcessed(true);
  79. ctx.complete(orderResult);
  80. };
  81. }
  82. }

activities directory

The Activities directory holds the four workflow activities used by the workflow, defined in the following files:

Tell us what you think!

We’re continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement?

Join the discussion in our discord channel.

Next steps

Explore Dapr tutorials >>

Last modified October 12, 2023: Update config.toml (#3826) (0ffc2e7)