EventMesh工作流

业务场景

图中你正在构建一个简单的电商订单管理系统,系统能够接收和调配新的订单,调配流程需要处理所有的订单创建,付款处理以及发货处理。

为了实现高可用和高性能,你可以使用事件驱动架构(EDA)构建微服务应用去处理商店前端,订单管理,支付处理和发货管理。你可以在云上部署整个系统。要处理高并发,你可以利用消息系统缓冲,并扩展多个微服务实例。架构类似于:

Workflow Use Case

当每个微服务都在自己的事件通道上运行时,EventMesh在执行事件编排方面发挥着至关重要的作用。

我们使用 CNCF Serverless工作流 来描述此事件工作流编排。

CNCF Serverless工作流

CNCF Serverless工作流定义了一个厂商中立、开源和完全社区驱动的生态系统,用于定义和运行针对Serverless技术领域的基于DSL的工作流。

Serverless工作流定义了一种领域特定语言(DSL)来描述有状态和无状态的基于工作流的serverless函数和微服务编排。

详见官方github

EventMesh工作流

我们利用Serverless工作流DSL来描述EventMesh工作流。根据其规范,工作流由一系列用于描述控制流逻辑的工作流状态组成。目前,我们仅支持与事件相关的工作流状态。请参见工作流DSL设计中支持的状态。

工作流状态可以包含通用的操作,或在工作流执行期间应调用的服务/函数。这些操作可以引用可复用的函数定义(应如何调用这些函数/服务),还可以引用触发基于事件的服务调用的事件,以及要等待的事件,这些事件表示这种基于事件的服务调用完成。

在EDA解决方案中,我们通常使用AsyncAPI定义事件驱动的微服务。Serverless工作流“函数”定义支持使用AsyncAPI定义调用语义。有关详细信息,请参见Using Funtions for AsyncAPI Service

AsyncAPI

AsyncAPI是一项开源计划,旨在改善事件驱动体系结构(EDA)的当前状态。我们的长期目标是让使用EDA和使用REST API一样容易。包括从文档到代码生成、发现到事件管理。现在应用于REST API的大多数流程也适用于事件驱动/异步API。

详见AsyncAPI官网

工作流示例

在本示例中,我们构建了上面订单管理系统的事件驱动工作流。

首先,我们需要为我们的微服务应用定义AsyncAPI。

  • 在线商店应用程序
  1. asyncapi: 2.2.0
  2. info:
  3. title: Online Store application
  4. version: '0.1.0'
  5. channels:
  6. store/order:
  7. subscribe:
  8. operationId: newStoreOrder
  9. message:
  10. $ref : '#/components/NewOrder'
  • 订单服务
  1. asyncapi: 2.2.0
  2. info:
  3. title: Order Service
  4. version: '0.1.0'
  5. channels:
  6. order/inbound:
  7. publish:
  8. operationId: sendOrder
  9. message:
  10. $ref : '#/components/Order'
  11. order/outbound:
  12. subscribe:
  13. operationId: processedOrder
  14. message:
  15. $ref : '#/components/Order'
  • 支付服务
  1. asyncapi: 2.2.0
  2. info:
  3. title: Payment Service
  4. version: '0.1.0'
  5. channels:
  6. payment/inbound:
  7. publish:
  8. operationId: sendPayment
  9. message:
  10. $ref : '#/components/OrderPayment'
  11. payment/outbound:
  12. subscribe:
  13. operationId: paymentReceipt
  14. message:
  15. $ref : '#/components/OrderPayment'
  • 物流服务
  1. asyncapi: 2.2.0
  2. info:
  3. title: Shipment Service
  4. version: '0.1.0'
  5. channels:
  6. shipment/inbound:
  7. publish:
  8. operationId: sendShipment
  9. message:
  10. $ref : '#/components/OrderShipment'

接下来,定义描述订单管理业务逻辑的订单工作流。

  1. id: storeorderworkflow
  2. version: '1.0'
  3. specVersion: '0.8'
  4. name: Store Order Management Workflow
  5. states:
  6. - name: Receive New Order Event
  7. type: event
  8. onEvents:
  9. - eventRefs:
  10. - NewOrderEvent
  11. actions:
  12. - eventRef:
  13. triggerEventRef: OrderServiceSendEvent
  14. resultEventRef: OrderServiceResultEvent
  15. - eventRef:
  16. triggerEventRef: PaymentServiceSendEvent
  17. resultEventRef: PaymentServiceResultEvent
  18. transition: Check Payment Status
  19. - name: Check Payment Status
  20. type: switch
  21. dataConditions:
  22. - name: Payment Successfull
  23. condition: "${ .payment.status == 'success' }"
  24. transition: Send Order Shipment
  25. - name: Payment Denied
  26. condition: "${ .payment.status == 'denied' }"
  27. end: true
  28. defaultCondition:
  29. end: true
  30. - name: Send Order Shipment
  31. type: operation
  32. actions:
  33. - eventRef:
  34. triggerEventRef: ShipmentServiceSendEvent
  35. end: true
  36. events:
  37. - name: NewOrderEvent
  38. source: file://onlineStoreApp.yaml#newStoreOrder
  39. type: asyncapi
  40. kind: consumed
  41. - name: OrderServiceSendEvent
  42. source: file://orderService.yaml#sendOrder
  43. type: asyncapi
  44. kind: produced
  45. - name: OrderServiceResultEvent
  46. source: file://orderService.yaml#processedOrder
  47. type: asyncapi
  48. kind: consumed
  49. - name: PaymentServiceSendEvent
  50. source: file://paymentService.yaml#sendPayment
  51. type: asyncapi
  52. kind: produced
  53. - name: PaymentServiceResultEvent
  54. source: file://paymentService.yaml#paymentReceipt
  55. type: asyncapi
  56. kind: consumed
  57. - name: ShipmentServiceSendEvent
  58. source: file://shipmentService.yaml#sendShipment
  59. type: asyncapi
  60. kind: produced

对应的工作流图如下:

Workflow Diagram

EventMesh工作流引擎

在下面的体系结构图中, EventMesh目录, EventMesh工作流引擎 和 EventMesh Runtime在三个不同的处理器中运行。

Workflow Architecture

运行工作流的步骤如下:

  1. 在环境中部署发布者和订阅者应用程序。 使用AsyncAPI描述应用程序API,生成asyncAPI yaml。 使用AsyncAPI在EventMesh目录中注册发布者和订阅者应用程序。

  2. 在EventMesh工作流引擎中注册Serverless工作流DSL。

  3. 工作流引擎从EventMesh目录查询发布服务器和订阅服务器的需要的工作流DSL函数

  4. 事件驱动App将事件发布到EventMesh Runtime触发工作流。EventMesh工作流引擎发布和订阅事件、编排事件。

EventMesh Catalog 设计

EventMesh目录存储发布者、订阅者和通道元数据。由以下模块组成:

  • AsyncAPI解析器

    使用AsyncAPI社区提供的SDK (tool list), 解析并验证AsyncAPI yaml输入,并生成AsyncAPI定义。

  • 发布者, 通道, 订阅者模块

    从AsyncAPI定义存储发布者、订阅者和通道信息。

EventMesh工作流引擎设计

工作流引擎由以下模块组成:

  • 工作流解析器

    使用Serverless Workflow社区提供的SDK(SDKs), 解析和验证工作流DSL输入,并生成工作流定义。

  • 工作流模块

    管理工作流实例的生命周期,从创建、启动、停止到销毁。

  • 状态模块

    管理工作流状态生命周期。支持与事件相关的状态,and the supported state list below is Work-in-Progress.

    工作流状态描述
    Operation执行Actions中定义的AsyncAPI函数
    Event检查定义的事件是否匹配,如果匹配,执行定义的AsyncAPI函数
    Switch检查事件是否与事件条件匹配,并执行定义的AsyncAPI函数
    Parallel并行执行定义的AsyncAPI函数
    ForEach迭代输入集合并执行定义的AsyncAPI函数
  • 行为模块

    管理函数中的行为。

  • 函数模块

    通过在EventMesh Runtime中创建发布者和/或订阅者来管理AsyncAPI函数,并管理发布者/订阅者生命周期。

    AsyncAPI 操作EventMesh Runtime
    PublishPublisher
    SubscribeSubscriber
  • 事件模块

    使用工作流DSL中定义的规则管理CloudEvent数据模型,包括事件过滤器、关联和转换。

  • 重试模块

    管理事件发布到EventMesh Runtime的重试逻辑。