操作方法:使用输入绑定来触发应用程序

使用 Dapr 输入绑定来触发由事件驱动的程序

通过输入绑定,您可以在外部资源发生事件时触发应用程序。 一个外部资源可以是队列、消息管道、云服务、文件系统等。 可选择随请求发送有效载荷和元数据。

输入绑定对于事件驱动的处理,数据管道或通常对事件作出反应并执行进一步处理非常理想。 Dapr 输入绑定允许您:

  • 接收不包含特定 SDK 或库的事件
  • 在不更改代码的情况下替换绑定
  • 关注业务逻辑而不是事件资源实现

Diagram showing bindings of example service

本指南以 Kafka 绑定为例。 您可以从绑定组件列表} 中找到自己喜欢的绑定规范。 在本指南中

  1. 该示例调用了 /binding 端点,其中 checkout,即要调用的绑定名称。
  2. 有效载荷位于必需的 data 字段中,并且可以是任何 JSON 可序列化的值。
  3. operation字段告诉绑定需要采取什么操作。 例如,Kafka绑定支持create操作

注意

如果你还没有,请尝试使用绑定快速入门快速了解如何使用绑定 API。

创建绑定

创建一个 binding.yaml 文件,并将其保存到应用程序目录中的 components 子文件夹中。

创建一个名称为 checkout 的新绑定组件。 在metadata部分,配置以下与Kafka相关的属性:

  • 您要发布信息的主题
  • Broker

在创建绑定组件时,请指定支持的绑定direction(方向)

使用 --resources-path 标志与 dapr run 命令一起使用,指向您的自定义资源目录。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: checkout
  5. spec:
  6. type: bindings.kafka
  7. version: v1
  8. metadata:
  9. # Kafka broker connection setting
  10. - name: brokers
  11. value: localhost:9092
  12. # consumer configuration: topic and consumer group
  13. - name: topics
  14. value: sample
  15. - name: consumerGroup
  16. value: group1
  17. # publisher configuration: topic
  18. - name: publishTopic
  19. value: sample
  20. - name: authRequired
  21. value: false
  22. - name: direction
  23. value: input

要将部署到Kubernetes集群中,请运行 kubectl apply -f binding.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: checkout
  5. spec:
  6. type: bindings.kafka
  7. version: v1
  8. metadata:
  9. # Kafka broker connection setting
  10. - name: brokers
  11. value: localhost:9092
  12. # consumer configuration: topic and consumer group
  13. - name: topics
  14. value: sample
  15. - name: consumerGroup
  16. value: group1
  17. # publisher configuration: topic
  18. - name: publishTopic
  19. value: sample
  20. - name: authRequired
  21. value: false
  22. - name: direction
  23. value: input

监听传入事件 (输入绑定)

现在配置您的应用程序来接收传入事件。 如果您正在使用HTTP,您需要:

  • binding.yaml 文件中,按照 metadata.name 指定的绑定名称,监听一个 POST 终结点。
  • 验证您的应用程序允许 Dapr 为此端点进行 OPTIONS 请求。

下面是利用 Dapr SDK 展示输出绑定的代码示例。

  1. //dependencies
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System;
  5. using Microsoft.AspNetCore.Mvc;
  6. //code
  7. namespace CheckoutService.controller
  8. {
  9. [ApiController]
  10. public class CheckoutServiceController : Controller
  11. {
  12. [HttpPost("/checkout")]
  13. public ActionResult<string> getCheckout([FromBody] int orderId)
  14. {
  15. Console.WriteLine("Received Message: " + orderId);
  16. return "CID" + orderId;
  17. }
  18. }
  19. }
  1. //dependencies
  2. import org.springframework.web.bind.annotation.*;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import reactor.core.publisher.Mono;
  6. //code
  7. @RestController
  8. @RequestMapping("/")
  9. public class CheckoutServiceController {
  10. private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
  11. @PostMapping(path = "/checkout")
  12. public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
  13. return Mono.fromRunnable(() ->
  14. log.info("Received Message: " + new String(body)));
  15. }
  16. }
  1. #dependencies
  2. import logging
  3. from dapr.ext.grpc import App, BindingRequest
  4. #code
  5. app = App()
  6. @app.binding('checkout')
  7. def getCheckout(request: BindingRequest):
  8. logging.basicConfig(level = logging.INFO)
  9. logging.info('Received Message : ' + request.text())
  10. app.run(6002)
  1. //dependencies
  2. import (
  3. "encoding/json"
  4. "log"
  5. "net/http"
  6. "github.com/gorilla/mux"
  7. )
  8. //code
  9. func getCheckout(w http.ResponseWriter, r *http.Request) {
  10. w.Header().Set("Content-Type", "application/json")
  11. var orderId int
  12. err := json.NewDecoder(r.Body).Decode(&orderId)
  13. log.Println("Received Message: ", orderId)
  14. if err != nil {
  15. log.Printf("error parsing checkout input binding payload: %s", err)
  16. w.WriteHeader(http.StatusOK)
  17. return
  18. }
  19. }
  20. func main() {
  21. r := mux.NewRouter()
  22. r.HandleFunc("/checkout", getCheckout).Methods("POST", "OPTIONS")
  23. http.ListenAndServe(":6002", r)
  24. }
  1. //dependencies
  2. import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. const serverHost = "127.0.0.1";
  6. const serverPort = "6002";
  7. const daprPort = "3602";
  8. start().catch((e) => {
  9. console.error(e);
  10. process.exit(1);
  11. });
  12. async function start() {
  13. const server = new DaprServer({
  14. serverHost,
  15. serverPort,
  16. communicationProtocol: CommunicationProtocolEnum.HTTP,
  17. clientOptions: {
  18. daprHost,
  19. daprPort,
  20. }
  21. });
  22. await server.binding.receive('checkout', async (orderId) => console.log(`Received Message: ${JSON.stringify(orderId)}`));
  23. await server.start();
  24. }

ACK一个事件

通过从您的HTTP处理程序返回一个200 OK响应,告诉Dapr您已成功处理了应用程序中的事件。

拒绝事件

告诉 Dapr 在您的应用程序中事件未被正确处理,并通过返回除 200 OK 以外的任何响应来安排重新传递。 例如,一个 500 Error

指定自定义路由

默认情况下,传入事件将发送到与输入绑定的名称对应的 HTTP 端点。 您可以通过在 binding.yaml 中设置以下元数据属性来覆盖此属性:

  1. name: mybinding
  2. spec:
  3. type: binding.rabbitmq
  4. metadata:
  5. - name: route
  6. value: /onevent

事件传递保证

事件传递保证由绑定实现控制。 根据绑定实现,事件传递可以正好一次或至少一次。

参考资料