指南:发布消息并订阅主题
了解如何使用一个服务向主题发送消息,并在另一个服务中订阅该主题
介绍
Pub/Sub 是一个分布式系统中的常见模式,它有许多服务用于解偶、异步消息传递。 使用Pub/Sub,您可以在事件消费者与事件生产者解偶的场景中启用。
Dapr 提供了一个可扩展的 Pub/Sub 系统(保证消息至少传递一次),允许开发者发布和订阅主题。 Dapr 为 Pub/Sub 提供组件,使操作者能够使用他们所喜欢的基础设施,例如 Redis Streams 和 Kafka 等。
步骤 1: 设置 Pub/Sub 组件
当发布消息时,必须指定所发送数据的内容类型。 除非指定, Dapr 将假定类型为 text/plain
。 当使用 Dapr 的 HTTP API时,内容类型可以设置在 Content-Type
头中。 gRPC 客户端和 SDK 有一个专用的内容类型参数。
步骤 1: 设置 Pub/Sub 组件
然后发布一条消息给 deathStarStatus
主题:
第一步是设置 Pub/Sub 组件:
运行 dapr init
时默认在本地机器上安装 Redis 流。
在 Linux/MacOS 上打开 ~/.dapr/components/pubsub.yam
或在 Windows 上打开%UserProfile%\.dapr\components\pubsub.yaml
组件文件以验证:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
You can override this file with another Redis instance or another pubsub component by creating a components
directory containing the file and using the flag --components-path
with the dapr run
CLI command.
To deploy this into a Kubernetes cluster, fill in the metadata
connection details of your desired pubsub component in the yaml below, save as pubsub.yaml
, and run kubectl apply -f pubsub.yaml
.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
步骤 2: 订阅主题
Dapr 允许两种方法订阅主题:
- 声明式,其中定义在外部文件中。
- 编程方式,订阅在用户代码中定义
Note
声明和编程方式都支持相同的功能。 声明的方式从用户代码中移除对 Dapr 的依赖性,并允许使用现有应用程序订阅主题。 编程方法在用户代码中实现订阅。
声明式订阅
您可以使用以下自定义资源定义 (CRD) 订阅主题。 创建名为 subscription.yaml
的文件并粘贴以下内容:
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
上面的示例显示了 deathStarStatus
主题的事件订阅,对于pubsub 组件 pubsub
。
route
告诉 Dapr 将所有主题消息发送到应用程序中的/dsstatus
端点。scopes
为app1
和app2
启用订阅。
设置组件:
将 CRD 放在 ./components
目录中。 当 Dapr 启动时,它将加载组件和订阅。
注意:默认情况下,在 MacOS/Linux 上从 $HOME/.dapr/components
加载组件,以及 %USERPROFILE%\.dapr\components
在Windows上。
还可以通过将 Dapr CLI 指向组件路径来覆盖默认目录:
dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py
注意:如果你将订阅置于自定义组件路径中,请确保Pub/Sub 组件也存在。
在 Kubernetes 中,将 CRD 保存到文件中并将其应用于群集:
kubectl apply -f subscription.yaml
Example
创建名为 app1.py
的文件,并粘贴以下内容:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
创建名为” app1.py
的文件,并粘贴如下内容:
pip install flask
pip install flask_cors
创建 app1.py
后,确保 flask 和 flask_cors 已经安装了:
dapr --app-id app1 --app-port 5000 run python app1.py
设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js
文件中:
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.post('/dsstatus', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
设置上述订阅后,将此 javascript(Node > 4.16)下载到 app2.js
文件中:
dapr --app-id app2 --app-port 3000 run node app2.js
创建名为 app1.py
的文件,并粘贴以下内容:
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
在创建 app1.php
并安装 SDK后, 继续启动应用程序:
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
编程方式订阅
若要订阅主题,请使用您选择的编程语言启动 Web 服务器,并监听以下 GET
终结点: /dapr/subscribe
。 Dapr 实例将在启动时调用到您的应用,并期望对的订阅主题响应 JOSN:
pubsubname
: Dapr 用到的 pub/sub 组件topic
: 订阅的主题route
:当消息来到该主题时,Dapr 需要调用哪个终结点
Example
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'deathStarStatus',
'route': 'dsstatus'}]
return jsonify(subscriptions)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
创建 app1.py
后,确保 flask 和 flask_cors 已经安装了:
pip install flask
pip install flask_cors
然后运行:
dapr --app-id app1 --app-port 5000 run python app1.py
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "deathStarStatus",
route: "dsstatus"
}
]);
})
app.post('/dsstatus', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
运行此应用:
dapr --app-id app2 --app-port 3000 run node app2.js
更新 app1.php
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus'),
]]));
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
运行此应用:
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
/dsstatus
终结点与订阅中定义的 route
相匹配,这是 Dapr 将所有主题消息发送至的位置。
步骤 3: 发布主题
要发布主题,您需要运行一个 Dapr sidecar 的实例才能使用 Pub/Sub Redis 组件。 您可以使用安装在您本地环境中的默认的Redis组件。
用名为 testpubsub
的 app-id 启动一个 Dapr 实例:
dapr run --app-id testpubsub --dapr-http-port 3500
然后发布一条消息给 deathStarStatus
主题:
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'
然后发布一条消息给 deathStarStatus
主题:
curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'
然后发布一条消息给 deathStarStatus
主题:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status": "completed"}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'
Dapr 将在符合 Cloud Events v1.0 的信封中自动包装用户有效负载,对 datacontenttype
属性使用 Content-Type
头值。
步骤 4: ACK-ing 消息
为了告诉Dapr 消息处理成功,返回一个 200 OK
响应。 如果 Dapr 收到超过 200
的返回状态代码,或者你的应用崩溃,Dapr 将根据 At-Least-Once 语义尝试重新传递消息。
Example
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.post('/dsstatus', (req, res) => {
res.sendStatus(200);
});
(可选) 步骤5:发布带有代码的主题
如果您喜欢使用代码发布一个主题,下面就是一个例子。
const express = require('express');
const path = require('path');
const request = require('request');
const bodyParser = require('body-parser');
const app = express();
app.use(bodyParser.json());
const daprPort = process.env.DAPR_HTTP_PORT || 3500;
const daprUrl = `http://localhost:${daprPort}/v1.0`;
const port = 8080;
const pubsubName = 'pubsub';
app.post('/publish', (req, res) => {
console.log("Publishing: ", req.body);
const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
request( { uri: publishUrl, method: 'POST', json: req.body } );
res.sendStatus(200);
});
app.listen(process.env.PORT || port, () => console.log(`Listening on port ${port}!`));
如果您喜欢使用代码发布一个主题,下面就是一个例子。
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory, \Psr\Log\LoggerInterface $logger) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('deathStarStatus')->publish('operational');
$logger->alert('published!');
});
您可以将此保存到 app2.php
当 app1
正在另一个终端中运行时,执行:
dapr --app-id app2 run -- php app2.php
发送自定义 CloudEvent
Dapr 自动接收发布请求上发送的数据,并将其包装在CloudEvent 1.0 信封中。 如果您想使用自己自定义的 CloudEvent,请确保指定内容类型为 application/ cloudevents+json
。
Read about content types here, and about the Cloud Events message format.
下一步
- 试试 Pub/Sub 快速启动示例
- Learn about topic scoping
- Learn about message time-to-live
- Learn how to configure Pub/Sub components with multiple namespaces
- List of pub/sub components
- Read the API reference