操作方法:使用输入绑定来触发应用程序
使用 Dapr 输入绑定来触发由事件驱动的程序
通过输入绑定,您可以在外部资源发生事件时触发应用程序。 一个外部资源可以是队列、消息管道、云服务、文件系统等。 可选择随请求发送有效载荷和元数据。
输入绑定对于事件驱动的处理,数据管道或通常对事件作出反应并执行进一步处理非常理想。 Dapr 输入绑定允许您:
- 接收不包含特定 SDK 或库的事件
- 在不更改代码的情况下替换绑定
- 关注业务逻辑而不是事件资源实现
本指南以 Kafka 绑定为例。 您可以从绑定组件列表} 中找到自己喜欢的绑定规范。 在本指南中
- 该示例调用了
/binding
端点,其中checkout
,即要调用的绑定名称。 - 有效载荷位于必需的
data
字段中,并且可以是任何 JSON 可序列化的值。 operation
字段告诉绑定需要采取什么操作。 例如,Kafka绑定支持create操作。- 您可以查看每个输出绑定支持的操作(针对每个组件)。
注意
如果你还没有,请尝试使用绑定快速入门快速了解如何使用绑定 API。
创建绑定
创建一个 binding.yaml
文件,并将其保存到应用程序目录中的 components
子文件夹中。
创建一个名称为 checkout
的新绑定组件。 在metadata
部分,配置以下与Kafka相关的属性:
- 您要发布信息的主题
- Broker
在创建绑定组件时,请指定支持的绑定direction(方向)。
使用 --resources-path
标志与 dapr run
命令一起使用,指向您的自定义资源目录。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: checkout
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: false
- name: direction
value: input
要将部署到Kubernetes集群中,请运行 kubectl apply -f binding.yaml
。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: checkout
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: false
- name: direction
value: input
监听传入事件 (输入绑定)
现在配置您的应用程序来接收传入事件。 如果您正在使用HTTP,您需要:
- 在
binding.yaml
文件中,按照metadata.name
指定的绑定名称,监听一个POST
终结点。 - 验证您的应用程序允许 Dapr 为此端点进行
OPTIONS
请求。
下面是利用 Dapr SDK 展示输出绑定的代码示例。
//dependencies
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
//code
namespace CheckoutService.controller
{
[ApiController]
public class CheckoutServiceController : Controller
{
[HttpPost("/checkout")]
public ActionResult<string> getCheckout([FromBody] int orderId)
{
Console.WriteLine("Received Message: " + orderId);
return "CID" + orderId;
}
}
}
//dependencies
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
//code
@RestController
@RequestMapping("/")
public class CheckoutServiceController {
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
@PostMapping(path = "/checkout")
public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
return Mono.fromRunnable(() ->
log.info("Received Message: " + new String(body)));
}
}
#dependencies
import logging
from dapr.ext.grpc import App, BindingRequest
#code
app = App()
@app.binding('checkout')
def getCheckout(request: BindingRequest):
logging.basicConfig(level = logging.INFO)
logging.info('Received Message : ' + request.text())
app.run(6002)
//dependencies
import (
"encoding/json"
"log"
"net/http"
"github.com/gorilla/mux"
)
//code
func getCheckout(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var orderId int
err := json.NewDecoder(r.Body).Decode(&orderId)
log.Println("Received Message: ", orderId)
if err != nil {
log.Printf("error parsing checkout input binding payload: %s", err)
w.WriteHeader(http.StatusOK)
return
}
}
func main() {
r := mux.NewRouter()
r.HandleFunc("/checkout", getCheckout).Methods("POST", "OPTIONS")
http.ListenAndServe(":6002", r)
}
//dependencies
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
//code
const daprHost = "127.0.0.1";
const serverHost = "127.0.0.1";
const serverPort = "6002";
const daprPort = "3602";
start().catch((e) => {
console.error(e);
process.exit(1);
});
async function start() {
const server = new DaprServer({
serverHost,
serverPort,
communicationProtocol: CommunicationProtocolEnum.HTTP,
clientOptions: {
daprHost,
daprPort,
}
});
await server.binding.receive('checkout', async (orderId) => console.log(`Received Message: ${JSON.stringify(orderId)}`));
await server.start();
}
ACK一个事件
通过从您的HTTP处理程序返回一个200 OK
响应,告诉Dapr您已成功处理了应用程序中的事件。
拒绝事件
告诉 Dapr 在您的应用程序中事件未被正确处理,并通过返回除 200 OK
以外的任何响应来安排重新传递。 例如,一个 500 Error
。
指定自定义路由
默认情况下,传入事件将发送到与输入绑定的名称对应的 HTTP 端点。 您可以通过在 binding.yaml
中设置以下元数据属性来覆盖此属性:
name: mybinding
spec:
type: binding.rabbitmq
metadata:
- name: route
value: /onevent
事件传递保证
事件传递保证由绑定实现控制。 根据绑定实现,事件传递可以正好一次或至少一次。