发布和订阅没有CloudEvents的消息

学习何时不使用CloudEvents以及如何禁用它们。

当添加 Dapr 到你的应用时,由于兼容性原因或某些应用程序不使用 Dapr,一些服务可能仍需要通过未封装在 CloudEvents 中的发布/订阅消息进行通信。 这些被称为“原始”发布/订阅消息。 Dapr使应用程序能够发布和订阅原始事件,这些事件未包装在CloudEvent中以实现兼容性。

发布原始消息

Dapr 应用能够在没有 CloudEvent 封装的情况下将原始事件发布到 pub/sub,以便与非 Dapr 应用兼容。

Diagram showing how to publish with Dapr when subscriber does not use Dapr or CloudEvent

警告

不使用 CloudEvents 将禁用对追踪、每个 messageId 的事件重复数据删除、content-type 元数据以及使用 CloudEvent 架构构建的任何其他功能的支持。

要禁用 CloudEvent 包装,请将 rawPayload 元数据设置为 true ,作为发布的一部分。 这允许订阅者接收这些消息,而不必分析 CloudEvent 。

  1. curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'
  1. from dapr.clients import DaprClient
  2. with DaprClient() as d:
  3. req_data = {
  4. 'order-number': '345'
  5. }
  6. # Create a typed message with content type and body
  7. resp = d.publish_event(
  8. pubsub_name='pubsub',
  9. topic_name='TOPIC_A',
  10. data=json.dumps(req_data),
  11. publish_metadata={'rawPayload': 'true'}
  12. )
  13. # Print the request
  14. print(req_data, flush=True)
  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->run(function(\DI\FactoryInterface $factory) {
  5. $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
  6. $publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
  7. });

订阅原始消息

Dapr 应用程序还能够订阅来自不使用 CloudEvent 封装的现有 pub/sub 的原始事件。

Diagram showing how to subscribe with Dapr when publisher does not use Dapr or CloudEvent

以编程方式订阅原始事件

在使用编程式订阅时,添加额外的元数据条目rawPayload,这样Dapr sidecar就会自动将有效载荷包装到与当前Dapr SDK兼容的CloudEvent中。

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [{'pubsubname': 'pubsub',
  11. 'topic': 'deathStarStatus',
  12. 'route': 'dsstatus',
  13. 'metadata': {
  14. 'rawPayload': 'true',
  15. } }]
  16. return jsonify(subscriptions)
  17. @app.route('/dsstatus', methods=['POST'])
  18. def ds_subscriber():
  19. print(request.json, flush=True)
  20. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  21. app.run()
  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
  4. new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
  5. ]]));
  6. $app->post('/dsstatus', function(
  7. #[\Dapr\Attributes\FromBody]
  8. \Dapr\PubSub\CloudEvent $cloudEvent,
  9. \Psr\Log\LoggerInterface $logger
  10. ) {
  11. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  12. return ['status' => 'SUCCESS'];
  13. }
  14. );
  15. $app->start();

声明式订阅原始事件

同样,您可以通过将rawPayload元数据条目添加到您的订阅规范中,以声明方式订阅原始事件。

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. topic: deathStarStatus
  7. routes:
  8. default: /dsstatus
  9. pubsubname: pubsub
  10. metadata:
  11. rawPayload: "true"
  12. scopes:
  13. - app1
  14. - app2

下一步