快速入门:工作流

开始使用 Dapr 工作流构建块

注意

Dapr工作流目前处于beta阶段。 查看已知限制 1.13.0.

让我们来看看 Dapr 的Workflow构建块。 在这个快速入门中,您将创建一个简单的控制台应用程序,以演示Dapr的工作流编程模型和工作流管理API。

在本指南中,您将:

  • 再次运行 order-processor 应用程序。
  • 启动工作流并观察工作流活动/任务的执行。
  • 查看工作流逻辑和工作流活动,以及它们在代码中的表示方式。

Workflow - 图1

在继续快速入门之前,请选择您首选的特定语言 Dapr SDK。

order-processor 控制台应用启动并管理 order_processing_workflow,该工作流模拟从商店购买物品。 工作流由五个独特的工作流活动或任务组成:

  • notify_activity: 利用记录器在整个工作流中打印出消息。 这些消息在以下情况下通知您:
    • 您的库存不足
    • 您的付款无法处理等等。
  • process_payment_activity: 处理并授权支付。
  • verify_inventory_activity: 检查状态存储以确保购买时有足够的库存存在。
  • update_inventory_activity: 从状态存储中删除请求的项目,并使用新的剩余库存值更新存储。
  • request_approval_activity: 如果付款金额超过50,000美元,则向经理请求批准。

第1步:先决条件

对于此示例,您将需要:

第2步:设置环境

克隆在Quickstarts存储库中提供的示例

  1. git clone https://github.com/dapr/quickstarts.git

在一个新的终端窗口中,导航到 order-processor 目录:

  1. cd workflows/python/sdk/order-processor

安装 Dapr Python SDK 包:

  1. pip3 install -r requirements.txt

第3步:运行订单处理程序应用

在终端中,使用 Multi-App Run 启动订单处理应用程序,并与一个 Dapr sidecar 并行运行:

  1. dapr run -f .

这将使用唯一的工作流ID启动order-processor应用程序,并运行工作流活动。

预期输出:

  1. == APP == Starting order workflow, purchasing 10 of cars
  2. == APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items...
  3. == APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 !
  4. == APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars
  5. == APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase
  6. == APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars
  7. == APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval
  8. == APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved!
  9. == APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD
  10. == APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully
  11. == APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars
  12. == APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock
  13. == APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed!
  14. == APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED
  15. == APP == Workflow completed! Result: Completed
  16. == APP == Purchase of item is Completed

(可选)第4步:在Zipkin中查看

运行 dapr init 启动 openzipkin/zipkin Docker 容器。 如果容器已停止运行,请使用以下命令启动Zipkin Docker容器:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

在 Zipkin web UI 中查看工作流 trace span(通常在 http://localhost:9411/zipkin/)。

Workflow - 图2

发生了什么?

当你运行 dapr run -f .

  1. 生成一个唯一的工作流订单ID(在上面的示例中,f4e1926e-3721-478d-be8a-f5bebd1995da)并安排工作流。
  2. NotifyActivity 工作流活动发送通知,说明已收到一份包含10辆车的订单。
  3. ReserveInventoryActivity工作流活动检查库存数据,确定是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 您的工作流程开始并通知您其状态。
  5. ProcessPaymentActivity 工作流活动开始处理订单 f4e1926e-3721-478d-be8a-f5bebd1995da 的付款,并确认是否成功。
  6. UpdateInventoryActivity 工作流活动在订单处理完成后,更新库存中当前可用的汽车。
  7. NotifyActivity 工作流活动发送通知,说明该订单 f4e1926e-3721-478d-be8a-f5bebd1995da 已完成。
  8. 工作流程已完成终止。

order-processor/app.py

在应用程序的程序文件中:

  • 生成了唯一的工作流顺序 ID
  • 工作流程已安排
  • 检索工作流状态
  • 工作流程及其调用的工作流程活动已注册
  1. class WorkflowConsoleApp:
  2. def main(self):
  3. # Register workflow and activities
  4. workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
  5. workflowRuntime.register_workflow(order_processing_workflow)
  6. workflowRuntime.register_activity(notify_activity)
  7. workflowRuntime.register_activity(requst_approval_activity)
  8. workflowRuntime.register_activity(verify_inventory_activity)
  9. workflowRuntime.register_activity(process_payment_activity)
  10. workflowRuntime.register_activity(update_inventory_activity)
  11. workflowRuntime.start()
  12. print("==========Begin the purchase of item:==========", flush=True)
  13. item_name = default_item_name
  14. order_quantity = 10
  15. total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
  16. order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
  17. # Start Workflow
  18. print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
  19. start_resp = daprClient.start_workflow(workflow_component=workflow_component,
  20. workflow_name=workflow_name,
  21. input=order)
  22. _id = start_resp.instance_id
  23. def prompt_for_approval(daprClient: DaprClient):
  24. daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
  25. event_name="manager_approval", event_data={'approval': True})
  26. approval_seeked = False
  27. start_time = datetime.now()
  28. while True:
  29. time_delta = datetime.now() - start_time
  30. state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
  31. if not state:
  32. print("Workflow not found!") # not expected
  33. elif state.runtime_status == "Completed" or\
  34. state.runtime_status == "Failed" or\
  35. state.runtime_status == "Terminated":
  36. print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
  37. break
  38. if time_delta.total_seconds() >= 10:
  39. state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
  40. if total_cost > 50000 and (
  41. state.runtime_status != "Completed" or
  42. state.runtime_status != "Failed" or
  43. state.runtime_status != "Terminated"
  44. ) and not approval_seeked:
  45. approval_seeked = True
  46. threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
  47. print("Purchase of item is ", state.runtime_status, flush=True)
  48. def restock_inventory(self, daprClient: DaprClient, baseInventory):
  49. for key, item in baseInventory.items():
  50. print(f'item: {item}')
  51. item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
  52. "per_item_cost": {item.per_item_cost}}}'
  53. daprClient.save_state("statestore-actors", key, item_str)
  54. if __name__ == '__main__':
  55. app = WorkflowConsoleApp()
  56. app.main()

order-processor/workflow.py

workflow.py中,工作流被定义为一个类,其中包含所有相关任务(由工作流活动确定)。

  1. def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
  2. """Defines the order processing workflow.
  3. When the order is received, the inventory is checked to see if there is enough inventory to
  4. fulfill the order. If there is enough inventory, the payment is processed and the inventory is
  5. updated. If there is not enough inventory, the order is rejected.
  6. If the total order is greater than $50,000, the order is sent to a manager for approval.
  7. """
  8. order_id = ctx.instance_id
  9. order_payload=json.loads(order_payload_str)
  10. yield ctx.call_activity(notify_activity,
  11. input=Notification(message=('Received order ' +order_id+ ' for '
  12. +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}'
  13. +' at $'+f'{order_payload["total_cost"]}' +' !')))
  14. result = yield ctx.call_activity(verify_inventory_activity,
  15. input=InventoryRequest(request_id=order_id,
  16. item_name=order_payload["item_name"],
  17. quantity=order_payload["quantity"]))
  18. if not result.success:
  19. yield ctx.call_activity(notify_activity,
  20. input=Notification(message='Insufficient inventory for '
  21. +f'{order_payload["item_name"]}'+'!'))
  22. return OrderResult(processed=False)
  23. if order_payload["total_cost"] > 50000:
  24. yield ctx.call_activity(requst_approval_activity, input=order_payload)
  25. approval_task = ctx.wait_for_external_event("manager_approval")
  26. timeout_event = ctx.create_timer(timedelta(seconds=200))
  27. winner = yield when_any([approval_task, timeout_event])
  28. if winner == timeout_event:
  29. yield ctx.call_activity(notify_activity,
  30. input=Notification(message='Payment for order '+order_id
  31. +' has been cancelled due to timeout!'))
  32. return OrderResult(processed=False)
  33. approval_result = yield approval_task
  34. if approval_result["approval"]:
  35. yield ctx.call_activity(notify_activity, input=Notification(
  36. message=f'Payment for order {order_id} has been approved!'))
  37. else:
  38. yield ctx.call_activity(notify_activity, input=Notification(
  39. message=f'Payment for order {order_id} has been rejected!'))
  40. return OrderResult(processed=False)
  41. yield ctx.call_activity(process_payment_activity, input=PaymentRequest(
  42. request_id=order_id, item_being_purchased=order_payload["item_name"],
  43. amount=order_payload["total_cost"], quantity=order_payload["quantity"]))
  44. try:
  45. yield ctx.call_activity(update_inventory_activity,
  46. input=PaymentRequest(request_id=order_id,
  47. item_being_purchased=order_payload["item_name"],
  48. amount=order_payload["total_cost"],
  49. quantity=order_payload["quantity"]))
  50. except Exception:
  51. yield ctx.call_activity(notify_activity,
  52. input=Notification(message=f'Order {order_id} Failed!'))
  53. return OrderResult(processed=False)
  54. yield ctx.call_activity(notify_activity, input=Notification(
  55. message=f'Order {order_id} has completed!'))
  56. return OrderResult(processed=True)

order-processor 控制台应用启动并管理订单处理工作流的生命周期,该工作流在状态存储中存储和检索数据。 工作流由四个工作流活动或任务组成:

  • notifyActivity: 利用记录器在整个工作流中打印出消息。 这些消息会在库存不足、付款无法处理等情况下通知用户。
  • ReserveInventoryActivity:检查状态存储以确保购买所需的库存足够。
  • RequestApprovalActivity:请求批准超过一定阈值的订单
  • processPaymentActivity: 处理并授权支付。
  • updateInventoryActivity: 使用新的剩余库存值更新状态存储。

第1步:先决条件

对于此示例,您将需要:

第2步:设置环境

克隆在Quickstarts存储库中提供的示例

  1. git clone https://github.com/dapr/quickstarts.git

在一个新的终端窗口中,导航到 order-processor 目录:

  1. cd workflows/javascript/sdk/order-processor

安装依赖项:

  1. cd ./javascript/sdk
  2. npm install
  3. npm run build
  4. cd ..

第3步:运行订单处理程序应用

在终端中,使用 Multi-App Run 启动订单处理应用程序,并与一个 Dapr sidecar 并行运行:

  1. dapr run -f .

这将使用唯一的工作流ID启动order-processor应用程序,并运行工作流活动。

预期输出:

  1. == APP - workflowApp == == APP == Orchestration scheduled with ID: 0c332155-1e02-453a-a333-28cfc7777642
  2. == APP - workflowApp == == APP == Waiting 30 seconds for instance 0c332155-1e02-453a-a333-28cfc7777642 to complete...
  3. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  4. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 0 history event...
  5. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, EXECUTIONSTARTED=1]
  6. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  7. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Waiting for 1 task(s) and 0 event(s) to complete...
  8. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  9. == APP - workflowApp == == APP == Received "Activity Request" work item
  10. == APP - workflowApp == == APP == Received order 0c332155-1e02-453a-a333-28cfc7777642 for 10 item1 at a total cost of 100
  11. == APP - workflowApp == == APP == Activity notifyActivity completed with output undefined (0 chars)
  12. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  13. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 3 history event...
  14. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  15. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  16. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Waiting for 1 task(s) and 0 event(s) to complete...
  17. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  18. == APP - workflowApp == == APP == Received "Activity Request" work item
  19. == APP - workflowApp == == APP == Reserving inventory for 0c332155-1e02-453a-a333-28cfc7777642 of 10 item1
  20. == APP - workflowApp == == APP == 2024-02-16T03:15:59.498Z INFO [HTTPClient, HTTPClient] Sidecar Started
  21. == APP - workflowApp == == APP == There are 100 item1 in stock
  22. == APP - workflowApp == == APP == Activity reserveInventoryActivity completed with output {"success":true,"inventoryItem":{"perItemCost":100,"quantity":100,"itemName":"item1"}} (86 chars)
  23. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  24. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 6 history event...
  25. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  26. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  27. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Waiting for 1 task(s) and 0 event(s) to complete...
  28. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  29. == APP - workflowApp == == APP == Received "Activity Request" work item
  30. == APP - workflowApp == == APP == Processing payment for order item1
  31. == APP - workflowApp == == APP == Payment of 100 for 10 item1 processed successfully
  32. == APP - workflowApp == == APP == Activity processPaymentActivity completed with output true (4 chars)
  33. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  34. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 9 history event...
  35. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  36. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  37. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Waiting for 1 task(s) and 0 event(s) to complete...
  38. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  39. == APP - workflowApp == == APP == Received "Activity Request" work item
  40. == APP - workflowApp == == APP == Updating inventory for 0c332155-1e02-453a-a333-28cfc7777642 of 10 item1
  41. == APP - workflowApp == == APP == Inventory updated for 0c332155-1e02-453a-a333-28cfc7777642, there are now 90 item1 in stock
  42. == APP - workflowApp == == APP == Activity updateInventoryActivity completed with output {"success":true,"inventoryItem":{"perItemCost":100,"quantity":90,"itemName":"item1"}} (85 chars)
  43. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  44. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 12 history event...
  45. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  46. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  47. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Waiting for 1 task(s) and 0 event(s) to complete...
  48. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  49. == APP - workflowApp == == APP == Received "Activity Request" work item
  50. == APP - workflowApp == == APP == order 0c332155-1e02-453a-a333-28cfc7777642 processed successfully!
  51. == APP - workflowApp == == APP == Activity notifyActivity completed with output undefined (0 chars)
  52. == APP - workflowApp == == APP == Received "Orchestrator Request" work item with instance id '0c332155-1e02-453a-a333-28cfc7777642'
  53. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Rebuilding local state with 15 history event...
  54. == APP - workflowApp == == APP == Processing order 0c332155-1e02-453a-a333-28cfc7777642...
  55. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
  56. == APP - workflowApp == == APP == Order 0c332155-1e02-453a-a333-28cfc7777642 processed successfully!
  57. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Orchestration completed with status COMPLETED
  58. == APP - workflowApp == == APP == 0c332155-1e02-453a-a333-28cfc7777642: Returning 1 action(s)
  59. == APP - workflowApp == time="2024-02-15T21:15:59.5589687-06:00" level=info msg="0c332155-1e02-453a-a333-28cfc7777642: 'orderProcessingWorkflow' completed with a COMPLETED status." app_id=activity-sequence-workflow instance=kaibocai-devbox scope=wfengine.backend type=log ver=1.12.4
  60. == APP - workflowApp == == APP == Instance 0c332155-1e02-453a-a333-28cfc7777642 completed

(可选)第4步:在Zipkin中查看

运行 dapr init 启动 openzipkin/zipkin Docker 容器。 如果容器已停止运行,请使用以下命令启动Zipkin Docker容器:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

在 Zipkin web UI 中查看工作流 trace span(通常在 http://localhost:9411/zipkin/)。

Workflow - 图3

发生了什么?

当你运行 dapr run -f .

  1. 生成一个唯一的工作流订单ID(在上面的示例中,0c332155-1e02-453a-a333-28cfc7777642)并安排工作流。
  2. notifyActivity 工作流活动发送通知,说明已收到一份包含10辆车的订单。
  3. reserveInventoryActivity工作流活动检查库存数据,确定是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 您的工作流程开始并通知您其状态。
  5. processPaymentActivity 工作流活动开始处理订单 0c332155-1e02-453a-a333-28cfc7777642 的付款,并确认是否成功。
  6. updateInventoryActivity 工作流活动在订单处理完成后,更新库存中当前可用的汽车。
  7. notifyActivity 工作流活动发送通知,说明该订单 0c332155-1e02-453a-a333-28cfc7777642 已完成。
  8. 工作流程已完成终止。

order-processor/workflowApp.ts

在应用程序的文件中:

  • 生成了唯一的工作流顺序 ID
  • 工作流程已安排
  • 检索工作流状态
  • 工作流程及其调用的工作流程活动已注册
  1. import { DaprWorkflowClient, WorkflowRuntime, DaprClient } from "@dapr/dapr-dev";
  2. import { InventoryItem, OrderPayload } from "./model";
  3. import { notifyActivity, orderProcessingWorkflow, processPaymentActivity, requestApprovalActivity, reserveInventoryActivity, updateInventoryActivity } from "./orderProcessingWorkflow";
  4. async function start() {
  5. // Update the gRPC client and worker to use a local address and port
  6. const workflowClient = new DaprWorkflowClient();
  7. const workflowWorker = new WorkflowRuntime();
  8. const daprClient = new DaprClient();
  9. const storeName = "statestore";
  10. const inventory = new InventoryItem("item1", 100, 100);
  11. const key = inventory.itemName;
  12. await daprClient.state.save(storeName, [
  13. {
  14. key: key,
  15. value: inventory,
  16. }
  17. ]);
  18. const order = new OrderPayload("item1", 100, 10);
  19. workflowWorker
  20. .registerWorkflow(orderProcessingWorkflow)
  21. .registerActivity(notifyActivity)
  22. .registerActivity(reserveInventoryActivity)
  23. .registerActivity(requestApprovalActivity)
  24. .registerActivity(processPaymentActivity)
  25. .registerActivity(updateInventoryActivity);
  26. // Wrap the worker startup in a try-catch block to handle any errors during startup
  27. try {
  28. await workflowWorker.start();
  29. console.log("Workflow runtime started successfully");
  30. } catch (error) {
  31. console.error("Error starting workflow runtime:", error);
  32. }
  33. // Schedule a new orchestration
  34. try {
  35. const id = await workflowClient.scheduleNewWorkflow(orderProcessingWorkflow, order);
  36. console.log(`Orchestration scheduled with ID: ${id}`);
  37. // Wait for orchestration completion
  38. const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
  39. console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  40. } catch (error) {
  41. console.error("Error scheduling or waiting for orchestration:", error);
  42. throw error;
  43. }
  44. await workflowWorker.stop();
  45. await workflowClient.stop();
  46. }
  47. start().catch((e) => {
  48. console.error(e);
  49. process.exit(1);
  50. });

order-processor 控制台应用启动并管理订单处理工作流的生命周期,该工作流在状态存储中存储和检索数据。 工作流由四个工作流活动或任务组成:

  • NotifyActivity: 利用记录器在整个工作流中打印出消息
  • ReserveInventoryActivity: 检查状态存储以确保购买时有足够的库存
  • ProcessPaymentActivity: 处理并授权付款
  • UpdateInventoryActivity: 从状态存储中删除请求的项目,并使用新的剩余库存值更新存储

第1步:先决条件

对于此示例,您将需要:

第2步:设置环境

克隆在Quickstarts存储库中提供的示例

  1. git clone https://github.com/dapr/quickstarts.git

在一个新的终端窗口中,导航到 order-processor 目录:

  1. cd workflows/csharp/sdk/order-processor

第3步:运行订单处理程序应用

在终端中,使用 Multi-App Run 启动订单处理应用程序,并与一个 Dapr sidecar 并行运行:

  1. dapr run -f .

这将使用唯一的工作流ID启动order-processor应用程序,并运行工作流活动。

预期输出:

  1. == APP == Starting workflow 6d2abcc9 purchasing 10 Cars
  2. == APP == info: Microsoft.DurableTask.Client.Grpc.GrpcDurableTaskClient[40]
  3. == APP == Scheduling new OrderProcessingWorkflow orchestration with instance ID '6d2abcc9' and 47 bytes of input data.
  4. == APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
  5. == APP == Received order 6d2abcc9 for 10 Cars at $15000
  6. == APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
  7. == APP == Reserving inventory for order 6d2abcc9 of 10 Cars
  8. == APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
  9. == APP == There are: 100, Cars available for purchase
  10. == APP == Your workflow has started. Here is the status of the workflow: Dapr.Workflow.WorkflowState
  11. == APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
  12. == APP == Processing payment: 6d2abcc9 for 10 Cars at $15000
  13. == APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
  14. == APP == Payment for request ID '6d2abcc9' processed successfully
  15. == APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
  16. == APP == Checking Inventory for: Order# 6d2abcc9 for 10 Cars
  17. == APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
  18. == APP == There are now: 90 Cars left in stock
  19. == APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
  20. == APP == Order 6d2abcc9 has completed!
  21. == APP == Workflow Status: Completed

(可选)第4步:在Zipkin中查看

运行 dapr init 启动 openzipkin/zipkin Docker 容器。 如果容器已停止运行,请使用以下命令启动Zipkin Docker容器:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

在 Zipkin web UI 中查看工作流 trace span(通常在 http://localhost:9411/zipkin/)。

Workflow - 图4

发生了什么?

当你运行 dapr run -f .

  1. 生成一个唯一的工作流订单ID(在上面的示例中,6d2abcc9)并安排工作流。
  2. NotifyActivity 工作流活动发送通知,说明已收到一份包含10辆车的订单。
  3. ReserveInventoryActivity工作流活动检查库存数据,确定是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 您的工作流程开始并通知您其状态。
  5. ProcessPaymentActivity 工作流活动开始处理订单 6d2abcc9 的付款,并确认是否成功。
  6. UpdateInventoryActivity 工作流活动在订单处理完成后,更新库存中当前可用的汽车。
  7. NotifyActivity 工作流活动发送通知,表示订单 6d2abcc9 已完成。
  8. 工作流程已完成终止。

order-processor/Program.cs

在应用程序的程序文件中:

  • 生成了唯一的工作流顺序 ID
  • 工作流程已安排
  • 检索工作流状态
  • 工作流程及其调用的工作流程活动已注册
  1. using Dapr.Client;
  2. using Dapr.Workflow;
  3. //...
  4. {
  5. services.AddDaprWorkflow(options =>
  6. {
  7. // Note that it's also possible to register a lambda function as the workflow
  8. // or activity implementation instead of a class.
  9. options.RegisterWorkflow<OrderProcessingWorkflow>();
  10. // These are the activities that get invoked by the workflow(s).
  11. options.RegisterActivity<NotifyActivity>();
  12. options.RegisterActivity<ReserveInventoryActivity>();
  13. options.RegisterActivity<ProcessPaymentActivity>();
  14. options.RegisterActivity<UpdateInventoryActivity>();
  15. });
  16. };
  17. //...
  18. // Generate a unique ID for the workflow
  19. string orderId = Guid.NewGuid().ToString()[..8];
  20. string itemToPurchase = "Cars";
  21. int ammountToPurchase = 10;
  22. // Construct the order
  23. OrderPayload orderInfo = new OrderPayload(itemToPurchase, 15000, ammountToPurchase);
  24. // Start the workflow
  25. Console.WriteLine("Starting workflow {0} purchasing {1} {2}", orderId, ammountToPurchase, itemToPurchase);
  26. await daprClient.StartWorkflowAsync(
  27. workflowComponent: DaprWorkflowComponent,
  28. workflowName: nameof(OrderProcessingWorkflow),
  29. input: orderInfo,
  30. instanceId: orderId);
  31. // Wait for the workflow to start and confirm the input
  32. GetWorkflowResponse state = await daprClient.WaitForWorkflowStartAsync(
  33. instanceId: orderId,
  34. workflowComponent: DaprWorkflowComponent);
  35. Console.WriteLine("Your workflow has started. Here is the status of the workflow: {0}", state.RuntimeStatus);
  36. // Wait for the workflow to complete
  37. state = await daprClient.WaitForWorkflowCompletionAsync(
  38. instanceId: orderId,
  39. workflowComponent: DaprWorkflowComponent);
  40. Console.WriteLine("Workflow Status: {0}", state.RuntimeStatus);

order-processor/Workflows/OrderProcessingWorkflow.cs

OrderProcessingWorkflow.cs中,工作流被定义为一个类,其中包含所有相关任务(由工作流活动确定)。

  1. using Dapr.Workflow;
  2. //...
  3. class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
  4. {
  5. public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
  6. {
  7. string orderId = context.InstanceId;
  8. // Notify the user that an order has come through
  9. await context.CallActivityAsync(
  10. nameof(NotifyActivity),
  11. new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}"));
  12. string requestId = context.InstanceId;
  13. // Determine if there is enough of the item available for purchase by checking the inventory
  14. InventoryResult result = await context.CallActivityAsync<InventoryResult>(
  15. nameof(ReserveInventoryActivity),
  16. new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
  17. // If there is insufficient inventory, fail and let the user know
  18. if (!result.Success)
  19. {
  20. // End the workflow here since we don't have sufficient inventory
  21. await context.CallActivityAsync(
  22. nameof(NotifyActivity),
  23. new Notification($"Insufficient inventory for {order.Name}"));
  24. return new OrderResult(Processed: false);
  25. }
  26. // There is enough inventory available so the user can purchase the item(s). Process their payment
  27. await context.CallActivityAsync(
  28. nameof(ProcessPaymentActivity),
  29. new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
  30. try
  31. {
  32. // There is enough inventory available so the user can purchase the item(s). Process their payment
  33. await context.CallActivityAsync(
  34. nameof(UpdateInventoryActivity),
  35. new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
  36. }
  37. catch (TaskFailedException)
  38. {
  39. // Let them know their payment was processed
  40. await context.CallActivityAsync(
  41. nameof(NotifyActivity),
  42. new Notification($"Order {orderId} Failed! You are now getting a refund"));
  43. return new OrderResult(Processed: false);
  44. }
  45. // Let them know their payment was processed
  46. await context.CallActivityAsync(
  47. nameof(NotifyActivity),
  48. new Notification($"Order {orderId} has completed!"));
  49. // End the workflow with a success result
  50. return new OrderResult(Processed: true);
  51. }
  52. }

order-processor/Activities 文件夹

Activities目录包含工作流使用的四个工作流活动,这些活动在以下文件中定义:

  • NotifyActivity.cs
  • ReserveInventoryActivity.cs
  • ProcessPaymentActivity.cs
  • UpdateInventoryActivity.cs

观看演示

观看此视频演练Dapr Workflow .NET演示:

order-processor 控制台应用启动并管理订单处理工作流的生命周期,该工作流在状态存储中存储和检索数据。 工作流由四个工作流活动或任务组成:

  • NotifyActivity: 利用记录器在整个工作流中打印出消息
  • RequestApprovalActivity:请求批准处理付款
  • ReserveInventoryActivity: 检查状态存储以确保购买时有足够的库存
  • ProcessPaymentActivity: 处理并授权付款
  • UpdateInventoryActivity: 从状态存储中删除请求的项目,并使用新的剩余库存值更新存储

第1步:先决条件

对于此示例,您将需要:

第2步:设置环境

克隆在Quickstarts存储库中提供的示例

  1. git clone https://github.com/dapr/quickstarts.git

导航到 order-processor 目录:

  1. cd workflows/java/sdk/order-processor

安装依赖项:

  1. mvn clean install

第3步:运行订单处理程序应用

在终端中,使用 Multi-App Run 启动订单处理应用程序,并与一个 Dapr sidecar 并行运行:

  1. dapr run -f .

这将使用唯一的工作流ID启动order-processor应用程序,并运行工作流活动。

预期输出:

  1. == APP == *** Welcome to the Dapr Workflow console app sample!
  2. == APP == *** Using this app, you can place orders that start workflows.
  3. == APP == Start workflow runtime
  4. == APP == Sep 20, 2023 3:23:05 PM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
  5. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
  6. == APP == ==========Begin the purchase of item:==========
  7. == APP == Starting order workflow, purchasing 10 of cars
  8. == APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: edceba90-9c45-4be8-ad40-60d16e060797
  9. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow
  10. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): edceba90-9c45-4be8-ad40-60d16e060797
  11. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T19:23:09.755Z
  12. == APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  13. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  14. == APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 started
  15. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  16. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase
  17. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  18. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  19. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
  20. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: edceba90-9c45-4be8-ad40-60d16e060797 for 10 cars at $150000
  21. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'edceba90-9c45-4be8-ad40-60d16e060797' processed successfully
  22. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars
  23. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797': there are now 90 cars left in stock
  24. == APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : edceba90-9c45-4be8-ad40-60d16e060797
  25. == APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 completed, out is: {"processed":true}

(可选)第4步:在Zipkin中查看

运行 dapr init 启动 openzipkin/zipkin Docker 容器。 如果容器已停止运行,请使用以下命令启动Zipkin Docker容器:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

在 Zipkin web UI 中查看工作流 trace span(通常在 http://localhost:9411/zipkin/)。

Workflow - 图5

发生了什么?

当你运行 dapr run -f .

  1. 生成一个唯一的工作流订单ID(在上面的示例中,edceba90-9c45-4be8-ad40-60d16e060797)并安排工作流。
  2. NotifyActivity 工作流活动发送通知,说明已收到一份包含10辆车的订单。
  3. ReserveInventoryActivity工作流活动检查库存数据,确定是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 一旦批准,您的工作流程开始并通知您其状态。
  5. ProcessPaymentActivity 工作流活动开始处理订单 edceba90-9c45-4be8-ad40-60d16e060797 的付款,并确认是否成功。
  6. UpdateInventoryActivity 工作流活动在订单处理完成后,更新库存中当前可用的汽车。
  7. NotifyActivity 工作流活动发送通知,说明该订单 edceba90-9c45-4be8-ad40-60d16e060797 已完成。
  8. 工作流程已完成终止。

order-processor/WorkflowConsoleApp.java

在应用程序的程序文件中:

  • 生成了唯一的工作流顺序 ID
  • 工作流程已安排
  • 检索工作流状态
  • 工作流程及其调用的工作流程活动已注册
  1. package io.dapr.quickstarts.workflows;
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.workflows.client.DaprWorkflowClient;
  5. public class WorkflowConsoleApp {
  6. private static final String STATE_STORE_NAME = "statestore-actors";
  7. // ...
  8. public static void main(String[] args) throws Exception {
  9. System.out.println("*** Welcome to the Dapr Workflow console app sample!");
  10. System.out.println("*** Using this app, you can place orders that start workflows.");
  11. // Wait for the sidecar to become available
  12. Thread.sleep(5 * 1000);
  13. // Register the OrderProcessingWorkflow and its activities with the builder.
  14. WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
  15. builder.registerActivity(NotifyActivity.class);
  16. builder.registerActivity(ProcessPaymentActivity.class);
  17. builder.registerActivity(RequestApprovalActivity.class);
  18. builder.registerActivity(ReserveInventoryActivity.class);
  19. builder.registerActivity(UpdateInventoryActivity.class);
  20. // Build the workflow runtime
  21. try (WorkflowRuntime runtime = builder.build()) {
  22. System.out.println("Start workflow runtime");
  23. runtime.start(false);
  24. }
  25. InventoryItem inventory = prepareInventoryAndOrder();
  26. DaprWorkflowClient workflowClient = new DaprWorkflowClient();
  27. try (workflowClient) {
  28. executeWorkflow(workflowClient, inventory);
  29. }
  30. }
  31. // Start the workflow runtime, pulling and executing tasks
  32. private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) {
  33. System.out.println("==========Begin the purchase of item:==========");
  34. String itemName = inventory.getName();
  35. int orderQuantity = inventory.getQuantity();
  36. int totalcost = orderQuantity * inventory.getPerItemCost();
  37. OrderPayload order = new OrderPayload();
  38. order.setItemName(itemName);
  39. order.setQuantity(orderQuantity);
  40. order.setTotalCost(totalcost);
  41. System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName);
  42. String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order);
  43. System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n",
  44. instanceId);
  45. // Check workflow instance start status
  46. try {
  47. workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
  48. System.out.printf("workflow instance %s started%n", instanceId);
  49. } catch (TimeoutException e) {
  50. System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId);
  51. return;
  52. }
  53. // Check workflow instance complete status
  54. try {
  55. WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId,
  56. Duration.ofSeconds(30),
  57. true);
  58. if (workflowStatus != null) {
  59. System.out.printf("workflow instance %s completed, out is: %s %n", instanceId,
  60. workflowStatus.getSerializedOutput());
  61. } else {
  62. System.out.printf("workflow instance %s not found%n", instanceId);
  63. }
  64. } catch (TimeoutException e) {
  65. System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId);
  66. }
  67. }
  68. private static InventoryItem prepareInventoryAndOrder() {
  69. // prepare 100 cars in inventory
  70. InventoryItem inventory = new InventoryItem();
  71. inventory.setName("cars");
  72. inventory.setPerItemCost(15000);
  73. inventory.setQuantity(100);
  74. DaprClient daprClient = new DaprClientBuilder().build();
  75. restockInventory(daprClient, inventory);
  76. // prepare order for 10 cars
  77. InventoryItem order = new InventoryItem();
  78. order.setName("cars");
  79. order.setPerItemCost(15000);
  80. order.setQuantity(10);
  81. return order;
  82. }
  83. private static void restockInventory(DaprClient daprClient, InventoryItem inventory) {
  84. String key = inventory.getName();
  85. daprClient.saveState(STATE_STORE_NAME, key, inventory).block();
  86. }
  87. }

OrderProcessingWorkflow.java

OrderProcessingWorkflow.java中,工作流被定义为一个类,其中包含所有相关任务(由工作流活动确定)。

  1. package io.dapr.quickstarts.workflows;
  2. import io.dapr.workflows.Workflow;
  3. public class OrderProcessingWorkflow extends Workflow {
  4. @Override
  5. public WorkflowStub create() {
  6. return ctx -> {
  7. Logger logger = ctx.getLogger();
  8. String orderId = ctx.getInstanceId();
  9. logger.info("Starting Workflow: " + ctx.getName());
  10. logger.info("Instance ID(order ID): " + orderId);
  11. logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());
  12. OrderPayload order = ctx.getInput(OrderPayload.class);
  13. logger.info("Received Order: " + order.toString());
  14. OrderResult orderResult = new OrderResult();
  15. orderResult.setProcessed(false);
  16. // Notify the user that an order has come through
  17. Notification notification = new Notification();
  18. notification.setMessage("Received Order: " + order.toString());
  19. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  20. // Determine if there is enough of the item available for purchase by checking
  21. // the inventory
  22. InventoryRequest inventoryRequest = new InventoryRequest();
  23. inventoryRequest.setRequestId(orderId);
  24. inventoryRequest.setItemName(order.getItemName());
  25. inventoryRequest.setQuantity(order.getQuantity());
  26. InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(),
  27. inventoryRequest, InventoryResult.class).await();
  28. // If there is insufficient inventory, fail and let the user know
  29. if (!inventoryResult.isSuccess()) {
  30. notification.setMessage("Insufficient inventory for order : " + order.getItemName());
  31. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  32. ctx.complete(orderResult);
  33. return;
  34. }
  35. // Require orders over a certain threshold to be approved
  36. if (order.getTotalCost() > 5000) {
  37. ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(),
  38. order, ApprovalResult.class).await();
  39. if (approvalResult != ApprovalResult.Approved) {
  40. notification.setMessage("Order " + order.getItemName() + " was not approved.");
  41. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  42. ctx.complete(orderResult);
  43. return;
  44. }
  45. }
  46. // There is enough inventory available so the user can purchase the item(s).
  47. // Process their payment
  48. PaymentRequest paymentRequest = new PaymentRequest();
  49. paymentRequest.setRequestId(orderId);
  50. paymentRequest.setItemBeingPurchased(order.getItemName());
  51. paymentRequest.setQuantity(order.getQuantity());
  52. paymentRequest.setAmount(order.getTotalCost());
  53. boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(),
  54. paymentRequest, boolean.class).await();
  55. if (!isOK) {
  56. notification.setMessage("Payment failed for order : " + orderId);
  57. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  58. ctx.complete(orderResult);
  59. return;
  60. }
  61. inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(),
  62. inventoryRequest, InventoryResult.class).await();
  63. if (!inventoryResult.isSuccess()) {
  64. // If there is an error updating the inventory, refund the user
  65. // paymentRequest.setAmount(-1 * paymentRequest.getAmount());
  66. // ctx.callActivity(ProcessPaymentActivity.class.getName(),
  67. // paymentRequest).await();
  68. // Let users know their payment processing failed
  69. notification.setMessage("Order failed to update inventory! : " + orderId);
  70. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  71. ctx.complete(orderResult);
  72. return;
  73. }
  74. // Let user know their order was processed
  75. notification.setMessage("Order completed! : " + orderId);
  76. ctx.callActivity(NotifyActivity.class.getName(), notification).await();
  77. // Complete the workflow with order result is processed
  78. orderResult.setProcessed(true);
  79. ctx.complete(orderResult);
  80. };
  81. }
  82. }

activities 目录

Activities目录包含工作流使用的四个工作流活动,这些活动在以下文件中定义:

order-processor 控制台应用程序启动和管理 OrderProcessingWorkflow 工作流,模拟从商店购买物品。 工作流由五个独特的工作流活动或任务组成:

  • NotifyActivity: 利用记录器在整个工作流中打印出消息。 这些消息在以下情况下通知您:
    • 您的库存不足
    • 您的付款无法处理等等。
  • ProcessPaymentActivity: 处理并授权付款.
  • VerifyInventoryActivity: 检查状态存储以确保有足够的库存可供购买。
  • UpdateInventoryActivity: 从状态存储中删除请求的项目,并使用新的剩余库存值更新存储。
  • RequestApprovalActivity: 如果付款金额超过50,000美元,则向经理请求批准。

第1步:先决条件

对于此示例,您将需要:

第2步:设置环境

克隆在Quickstarts存储库中提供的示例

  1. git clone https://github.com/dapr/quickstarts.git

在一个新的终端窗口中,导航到 order-processor 目录:

  1. cd workflows/go/sdk/order-processor

第3步:运行订单处理程序应用

在终端中,使用 Multi-App Run 启动订单处理应用程序,并与一个 Dapr sidecar 并行运行:

  1. dapr run -f .

这将使用唯一的工作流ID启动order-processor应用程序,并运行工作流活动。

预期输出:

  1. == APP - order-processor == *** Welcome to the Dapr Workflow console app sample!
  2. == APP - order-processor == *** Using this app, you can place orders that start workflows.
  3. == APP - order-processor == dapr client initializing for: 127.0.0.1:50056
  4. == APP - order-processor == adding base stock item: paperclip
  5. == APP - order-processor == 2024/02/01 12:59:52 work item listener started
  6. == APP - order-processor == INFO: 2024/02/01 12:59:52 starting background processor
  7. == APP - order-processor == adding base stock item: cars
  8. == APP - order-processor == adding base stock item: computers
  9. == APP - order-processor == ==========Begin the purchase of item:==========
  10. == APP - order-processor == NotifyActivity: Received order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 cars - $150000
  11. == APP - order-processor == VerifyInventoryActivity: Verifying inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 of 10 cars
  12. == APP - order-processor == VerifyInventoryActivity: There are 100 cars available for purchase
  13. == APP - order-processor == RequestApprovalActivity: Requesting approval for payment of 150000USD for 10 cars
  14. == APP - order-processor == NotifyActivity: Payment for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has been approved!
  15. == APP - order-processor == ProcessPaymentActivity: 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 - cars (150000USD)
  16. == APP - order-processor == UpdateInventoryActivity: Checking Inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 * cars
  17. == APP - order-processor == UpdateInventoryActivity: There are now 90 cars left in stock
  18. == APP - order-processor == NotifyActivity: Order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has completed!
  19. == APP - order-processor == Workflow completed - result: COMPLETED
  20. == APP - order-processor == Purchase of item is complete

使用 CTRL+C 或以下命令停止 Dapr 工作流:

  1. dapr stop -f .

(可选)第4步:在Zipkin中查看

运行 dapr init 启动 openzipkin/zipkin Docker 容器。 如果容器已停止运行,请使用以下命令启动Zipkin Docker容器:

  1. docker run -d -p 9411:9411 openzipkin/zipkin

在 Zipkin web UI 中查看工作流 trace span(通常在 http://localhost:9411/zipkin/)。

Workflow - 图6

发生了什么?

当你运行 dapr run:

  1. 生成一个唯一的工作流订单ID(在上面的示例中,48ee83b7-5d80-48d5-97f9-6b372f5480a5)并安排工作流。
  2. NotifyActivity 工作流活动发送通知,说明已收到一份包含10辆车的订单。
  3. ReserveInventoryActivity工作流活动检查库存数据,确定是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 您的工作流程开始并通知您其状态。
  5. ProcessPaymentActivity 工作流活动开始处理订单 48ee83b7-5d80-48d5-97f9-6b372f5480a5 的付款,并确认是否成功。
  6. UpdateInventoryActivity 工作流活动在订单处理完成后,更新库存中当前可用的汽车。
  7. NotifyActivity 工作流活动发送通知,说明该订单 48ee83b7-5d80-48d5-97f9-6b372f5480a5 已完成。
  8. 工作流程已完成终止。

order-processor/main.go

在应用程序的程序文件中:

  • 生成了唯一的工作流顺序 ID
  • 工作流程已安排
  • 检索工作流状态
  • 工作流程及其调用的工作流程活动已注册
  1. func main() {
  2. fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
  3. fmt.Println("*** Using this app, you can place orders that start workflows.")
  4. // ...
  5. // Register workflow and activities
  6. if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil {
  7. log.Fatal(err)
  8. }
  9. if err := w.RegisterActivity(NotifyActivity); err != nil {
  10. log.Fatal(err)
  11. }
  12. if err := w.RegisterActivity(RequestApprovalActivity); err != nil {
  13. log.Fatal(err)
  14. }
  15. if err := w.RegisterActivity(VerifyInventoryActivity); err != nil {
  16. log.Fatal(err)
  17. }
  18. if err := w.RegisterActivity(ProcessPaymentActivity); err != nil {
  19. log.Fatal(err)
  20. }
  21. if err := w.RegisterActivity(UpdateInventoryActivity); err != nil {
  22. log.Fatal(err)
  23. }
  24. // Build and start workflow runtime, pulling and executing tasks
  25. if err := w.Start(); err != nil {
  26. log.Fatal(err)
  27. }
  28. daprClient, err := client.NewClient()
  29. if err != nil {
  30. log.Fatalf("failed to initialise dapr client: %v", err)
  31. }
  32. wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient))
  33. if err != nil {
  34. log.Fatalf("failed to initialise workflow client: %v", err)
  35. }
  36. // Check inventory
  37. inventory := []InventoryItem{
  38. {ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
  39. {ItemName: "cars", PerItemCost: 15000, Quantity: 100},
  40. {ItemName: "computers", PerItemCost: 500, Quantity: 100},
  41. }
  42. if err := restockInventory(daprClient, inventory); err != nil {
  43. log.Fatalf("failed to restock: %v", err)
  44. }
  45. fmt.Println("==========Begin the purchase of item:==========")
  46. itemName := defaultItemName
  47. orderQuantity := 10
  48. totalCost := inventory[1].PerItemCost * orderQuantity
  49. orderPayload := OrderPayload{
  50. ItemName: itemName,
  51. Quantity: orderQuantity,
  52. TotalCost: totalCost,
  53. }
  54. // Start workflow events, like receiving order, verifying inventory, and processing payment
  55. id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
  56. if err != nil {
  57. log.Fatalf("failed to start workflow: %v", err)
  58. }
  59. // ...
  60. // Notification that workflow has completed or failed
  61. for {
  62. timeDelta := time.Since(startTime)
  63. metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
  64. if err != nil {
  65. log.Fatalf("failed to fetch workflow: %v", err)
  66. }
  67. if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) {
  68. fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String())
  69. break
  70. }
  71. if timeDelta.Seconds() >= 10 {
  72. metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
  73. if err != nil {
  74. log.Fatalf("failed to fetch workflow: %v", err)
  75. }
  76. if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) {
  77. approvalSought = true
  78. promptForApproval(id)
  79. }
  80. }
  81. // Sleep to not DoS the dapr dev instance
  82. time.Sleep(time.Second)
  83. }
  84. fmt.Println("Purchase of item is complete")
  85. }
  86. // Request approval (RequestApprovalActivity)
  87. func promptForApproval(id string) {
  88. wfClient, err := workflow.NewClient()
  89. if err != nil {
  90. log.Fatalf("failed to initialise wfClient: %v", err)
  91. }
  92. if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil {
  93. log.Fatal(err)
  94. }
  95. }
  96. // Update inventory for remaining stock (UpdateInventoryActivity)
  97. func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
  98. for _, item := range inventory {
  99. itemSerialized, err := json.Marshal(item)
  100. if err != nil {
  101. return err
  102. }
  103. fmt.Printf("adding base stock item: %s\n", item.ItemName)
  104. if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil {
  105. return err
  106. }
  107. }
  108. return nil
  109. }

同时,OrderProcessingWorkflow及其活动在workflow.go中定义为方法。

告诉我们您的想法

我们一直在努力改进我们的快速入门示例,并重视您的反馈。 您觉得此快速入门有帮助吗? 您有改进的建议吗?

加入我们的discord频道参与讨论。

下一步

探索 Dapr 教程 >>