发布和订阅使用Cloudevents的消息
了解 Dapr 为何使用 CloudEvents,它们如何在 Dapr 发布/订阅中工作,以及如何创建 CloudEvents。
为了启用消息路由并为每个消息提供附加上下文,Dapr使用CloudEvents 1.0规范作为其消息格式。 任何通过 Dapr 发送到 topic 的应用程序消息都会自动被包装在 CloudEvents 信封中,使用 Content-Type 头部的值 作为 datacontenttype
属性。
Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而启用以下功能:
- 追踪
- 用于正确反序列化事件数据的 Content-type
- 发送方应用程序的验证
您可以选择三种方法之一通过发布/订阅来发布 CloudEvent:
- 发送一个发布/订阅事件,然后由 Dapr 封装在 CloudEvent 包裹中。
- 通过覆盖标准 CloudEvent 属性,替换 Dapr 提供的特定 CloudEvents 属性。
- 作为发布/订阅事件的一部分,编写自己的CloudEvent信封。
Dapr生成的CloudEvents示例
自动将发布操作发送到 Dapr 时,会自动将其包装在一个包含以下字段的 CloudEvent 包裹中:
id
source
specversion
类型
traceparent
traceid
tracestate
topic
pubsubname
time
datacontenttype
(可选)
以下示例演示了由Dapr生成的CloudEvent,用于发布操作到包含 orders
的主题:
- 一种唯一标识消息的 W3C
traceid
data
以及云事件的字段,其中数据内容被序列化为JSON
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "checkout",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
作为v1.0 CloudEvent的另一个示例,以下展示了将数据作为XML内容以JSON序列化的CloudEvent消息:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data" : "<note><to></to><from>user2</from><message>Order</message></note>",
"id" : "id-1234-5678-9101",
"specversion" : "1.0",
"datacontenttype" : "text/xml",
"subject" : "Test XML Message",
"source" : "https://example.com/message",
"type" : "xml.message",
"time" : "2020-09-23T06:23:21Z"
}
替换 Dapr 生成的 CloudEvents 值
Dapr 会自动生成多个 CloudEvent 属性。 您可以通过提供以下可选的元数据键/值来替换这些生成的 CloudEvent 属性:
cloudevent.id
: 重写id
cloudevent.source
: 重写source
cloudevent.type
: 重写type
cloudevent.traceid
: 重写traceid
cloudevent.tracestate
: 覆盖tracestate
cloudevent.traceparent
: 覆盖traceparent
使用这些元数据属性替换CloudEvents属性的能力适用于所有发布/订阅组件。
如何使用Dapr扩展来开发和运行Dapr应用程序
例如,要在代码中替换上面的CloudEvent示例中的source
和id
值:
with DaprClient() as client:
order = {'orderId': i}
# Publish an event/message using Dapr PubSub
result = client.publish_event(
pubsub_name='order_pub_sub',
topic_name='orders',
publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
)
var order = new Order(i);
using var client = new DaprClientBuilder().Build();
// Override cloudevent metadata
var metadata = new Dictionary<string,string>() {
{ "cloudevent.source", "payment" },
{ "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}
// Publish an event/message using Dapr PubSub
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);
await Task.Delay(TimeSpan.FromSeconds(1));
然后,JSON有效负载会反映新的source
和id
值:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "payment",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
重要
虽然你可以替换 traceid
/traceparent
和 tracestate
,否则可能会干扰跟踪事件,并在跟踪工具中报告不一致的结果。 推荐使用Open Telemetry进行分布式跟踪。 了解更多关于分布式追踪。
发布您自己的 CloudEvent
如果您想使用自己的CloudEvent,请确保将datacontenttype指定为application/cloudevents+json
。
如果由应用程序创建的CloudEvent不包含CloudEvent规范中的 个最低要求字段 ,则该消息将被拒绝。 如果缺少以下字段,Dapr将添加到CloudEvent中:
time
traceid
traceparent
tracestate
topic
pubsubname
source
类型
specversion
您可以向自定义 CloudEvent 添加不属于官方 CloudEvent 规范的附加字段。 Dapr 将按原样传递这些字段。
如何使用Dapr扩展来开发和运行Dapr应用程序
将一个 CloudEvent 发布到 orders
主题:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'
将一个 CloudEvent 发布到 orders
主题:
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
将一个 CloudEvent 发布到 orders
主题:
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
事件去重
当使用由Dapr创建的Cloud Events时,信封中包含一个id
字段,应用程序可以使用该字段执行消息去重操作。 Dapr 不会自动处理去重。 Dapr 支持使用原生支持消息去重的消息代理。
下一步
- 了解为什么您可能不需要使用CloudEvents
- 尝试Pub/Sub快速入门
- 发布/订阅组件列表
- Read the API reference