指南:如何编写和管理工作流
注意
Dapr工作流目前处于beta阶段。 查看已知限制 1.13.0.
让我们创建一个 Dapr 工作流,并使用控制台调用它。 通过提供的工作流示例,您将:
- 使用JavaScript工作流工作者执行工作流实例
- 利用JavaScript工作流客户端和API调用来启动和终止工作流实例
这个示例使用dapr init中的默认配置,在自托管模式下。
前期准备
- Dapr CLI 和初始化环境.
- Node.js 和 npm,
验证您是否正在使用最新的proto绑定
设置环境
克隆JavaScript SDK存储库并进入该存储库。
git clone https://github.com/dapr/js-sdk
cd js-sdk
从 JavaScript SDK 根目录中,导航到 Dapr Workflow 示例。
cd examples/workflow/authoring
运行以下命令使用 Dapr JavaScript SDK 安装运行此工作流示例所需的依赖项。
npm install
运行activity-sequence.ts
activity-sequence
文件在 Dapr 工作流运行时中注册了一个工作流和一个活动。 工作流是按顺序执行的一系列活动。 我们使用DaprWorkflowClient来调度一个新的工作流实例并等待其完成。
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
const hello = async (_: WorkflowActivityContext, name: string) => {
return `Hello ${name}!`;
};
const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
const cities: string[] = [];
const result1 = yield ctx.callActivity(hello, "Tokyo");
cities.push(result1);
const result2 = yield ctx.callActivity(hello, "Seattle");
cities.push(result2);
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);
return cities;
};
workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Workflow runtime started successfully");
} catch (error) {
console.error("Error starting workflow runtime:", error);
}
// Schedule a new orchestration
try {
const id = await workflowClient.scheduleNewWorkflow(sequence);
console.log(`Orchestration scheduled with ID: ${id}`);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
在上面的代码中:
workflowRuntime.registerWorkflow(sequence)
在 Dapr Workflow 运行时中将sequence
注册为一个工作流。await workflowRuntime.start();
构建并启动 Dapr Workflow 运行时内的引擎。await workflowClient.scheduleNewWorkflow(sequence)
使用 Dapr Workflow 运行时调度一个新的工作流实例。await workflowClient.waitForWorkflowCompletion(id, undefined, 30)
等待工作流实例完成。
在终端中执行以下命令来启动 activity-sequence.ts
:
npm run start:dapr:activity-sequence
预期输出
You're up and running! Both Dapr and your app logs will appear here.
...
== APP == Orchestration scheduled with ID: dc040bea-6436-4051-9166-c9294f9d2201
== APP == Waiting 30 seconds for instance dc040bea-6436-4051-9166-c9294f9d2201 to complete...
== APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 0 history event...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, EXECUTIONSTARTED=1]
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
== APP == Received "Activity Request" work item
== APP == Activity hello completed with output "Hello Tokyo!" (14 chars)
== APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 3 history event...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
== APP == Received "Activity Request" work item
== APP == Activity hello completed with output "Hello Seattle!" (16 chars)
== APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 6 history event...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Waiting for 1 task(s) and 0 event(s) to complete...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
== APP == Received "Activity Request" work item
== APP == Activity hello completed with output "Hello London!" (15 chars)
== APP == Received "Orchestrator Request" work item with instance id 'dc040bea-6436-4051-9166-c9294f9d2201'
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Rebuilding local state with 9 history event...
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Processing 2 new history event(s): [ORCHESTRATORSTARTED=1, TASKCOMPLETED=1]
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Orchestration completed with status COMPLETED
== APP == dc040bea-6436-4051-9166-c9294f9d2201: Returning 1 action(s)
INFO[0006] dc040bea-6436-4051-9166-c9294f9d2201: 'sequence' completed with a COMPLETED status. app_id=activity-sequence-workflow instance=kaibocai-devbox scope=wfengine.backend type=log ver=1.12.3
== APP == Instance dc040bea-6436-4051-9166-c9294f9d2201 completed
== APP == Orchestration completed! Result: ["Hello Tokyo!","Hello Seattle!","Hello London!"]