How-To: Publish a message and subscribe to a topic

Learn how to send messages to a topic with one service and subscribe to that topic in another service

Introduction

Pub/Sub is a common pattern in a distributed system with many services that want to utilize decoupled, asynchronous messaging. Using Pub/Sub, you can enable scenarios where event consumers are decoupled from event producers.

Dapr provides an extensible Pub/Sub system with At-Least-Once guarantees, allowing developers to publish and subscribe to topics. Dapr provides components for pub/sub, that enable operators to use their preferred infrastructure, for example Redis Streams, Kafka, etc.

Content Types

When publishing a message, it’s important to specify the content type of the data being sent. Unless specified, Dapr will assume text/plain. When using Dapr’s HTTP API, the content type can be set in a Content-Type header. gRPC clients and SDKs have a dedicated content type parameter.

Step 1: Setup the Pub/Sub component

The following example creates applications to publish and subscribe to a topic called deathStarStatus.

How-To: Publish & subscribe - 图1

The first step is to setup the Pub/Sub component:

Redis Streams is installed by default on a local machine when running dapr init.

Verify by opening your components file under %UserProfile%\.dapr\components\pubsub.yaml on Windows or ~/.dapr/components/pubsub.yaml on Linux/MacOS:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. spec:
  6. type: pubsub.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""

You can override this file with another Redis instance or another pubsub component by creating a components directory containing the file and using the flag --components-path with the dapr run CLI command.

To deploy this into a Kubernetes cluster, fill in the metadata connection details of your desired pubsub component in the yaml below, save as pubsub.yaml, and run kubectl apply -f pubsub.yaml.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.redis
  8. version: v1
  9. metadata:
  10. - name: redisHost
  11. value: localhost:6379
  12. - name: redisPassword
  13. value: ""

Step 2: Subscribe to topics

Dapr allows two methods by which you can subscribe to topics:

  • Declaratively, where subscriptions are defined in an external file.
  • Programmatically, where subscriptions are defined in user code.

Note

Both declarative and programmatic approaches support the same features. The declarative approach removes the Dapr dependency from your code and allows, for example, existing applications to subscribe to topics, without having to change code. The programmatic approach implements the subscription in your code.

Declarative subscriptions

You can subscribe to a topic using the following Custom Resources Definition (CRD). Create a file named subscription.yaml and paste the following:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. topic: deathStarStatus
  7. route: /dsstatus
  8. pubsubname: pubsub
  9. scopes:
  10. - app1
  11. - app2

The example above shows an event subscription to topic deathStarStatus, for the pubsub component pubsub.

  • The route field tells Dapr to send all topic messages to the /dsstatus endpoint in the app.
  • The scopes field enables this subscription for apps with IDs app1 and app2.

Set the component with:

Place the CRD in your ./components directory. When Dapr starts up, it loads subscriptions along with components.

Note: By default, Dapr loads components from $HOME/.dapr/components on MacOS/Linux and %USERPROFILE%\.dapr\components on Windows.

You can also override the default directory by pointing the Dapr CLI to a components path:

  1. dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py

Note: If you place the subscription in a custom components path, make sure the Pub/Sub component is present also.

In Kubernetes, save the CRD to a file and apply it to the cluster:

  1. kubectl apply -f subscription.yaml

Example

Create a file named app1.py and paste in the following:

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dsstatus', methods=['POST'])
  9. def ds_subscriber():
  10. print(request.json, flush=True)
  11. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  12. app.run()

After creating app1.py ensure flask and flask_cors are installed:

  1. pip install flask
  2. pip install flask_cors

Then run:

  1. dapr --app-id app1 --app-port 5000 run python app1.py

After setting up the subscription above, download this javascript (Node > 4.16) into a app2.js file:

  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.post('/dsstatus', (req, res) => {
  7. console.log(req.body);
  8. res.sendStatus(200);
  9. });
  10. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

Run this app with:

  1. dapr --app-id app2 --app-port 3000 run node app2.js

Create a file named app1.php and paste in the following:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->post('/dsstatus', function(
  5. #[\Dapr\Attributes\FromBody]
  6. \Dapr\PubSub\CloudEvent $cloudEvent,
  7. \Psr\Log\LoggerInterface $logger
  8. ) {
  9. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  10. return ['status' => 'SUCCESS'];
  11. }
  12. );
  13. $app->start();

After creating app1.php, and with the SDK installed, go ahead and start the app:

  1. dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php

Programmatic subscriptions

To subscribe to topics, start a web server in the programming language of your choice and listen on the following GET endpoint: /dapr/subscribe. The Dapr instance calls into your app at startup and expect a JSON response for the topic subscriptions with:

  • pubsubname: Which pub/sub component Dapr should use.
  • topic: Which topic to subscribe to.
  • route: Which endpoint for Dapr to call on when a message comes to that topic.

Example

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [{'pubsubname': 'pubsub',
  11. 'topic': 'deathStarStatus',
  12. 'route': 'dsstatus'}]
  13. return jsonify(subscriptions)
  14. @app.route('/dsstatus', methods=['POST'])
  15. def ds_subscriber():
  16. print(request.json, flush=True)
  17. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  18. app.run()

After creating app1.py ensure flask and flask_cors are installed:

  1. pip install flask
  2. pip install flask_cors

Then run:

  1. dapr --app-id app1 --app-port 5000 run python app1.py
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "deathStarStatus",
  11. route: "dsstatus"
  12. }
  13. ]);
  14. })
  15. app.post('/dsstatus', (req, res) => {
  16. console.log(req.body);
  17. res.sendStatus(200);
  18. });
  19. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

Run this app with:

  1. dapr --app-id app2 --app-port 3000 run node app2.js

Update app1.php with the following:

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
  4. new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus'),
  5. ]]));
  6. $app->post('/dsstatus', function(
  7. #[\Dapr\Attributes\FromBody]
  8. \Dapr\PubSub\CloudEvent $cloudEvent,
  9. \Psr\Log\LoggerInterface $logger
  10. ) {
  11. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  12. return ['status' => 'SUCCESS'];
  13. }
  14. );
  15. $app->start();

Run this app with:

  1. dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php

The /dsstatus endpoint matches the route defined in the subscriptions and this is where Dapr will send all topic messages to.

Step 3: Publish a topic

To publish a topic you need to run an instance of a Dapr sidecar to use the pubsub Redis component. You can use the default Redis component installed into your local environment.

Start an instance of Dapr with an app-id called testpubsub:

  1. dapr run --app-id testpubsub --dapr-http-port 3500

Then publish a message to the deathStarStatus topic:

  1. dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'

Then publish a message to the deathStarStatus topic:

  1. curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'

Then publish a message to the deathStarStatus topic:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status": "completed"}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'

Dapr automatically wraps the user payload in a Cloud Events v1.0 compliant envelope, using Content-Type header value for datacontenttype attribute.

Step 4: ACK-ing a message

In order to tell Dapr that a message was processed successfully, return a 200 OK response. If Dapr receives any other return status code than 200, or if your app crashes, Dapr will attempt to redeliver the message following At-Least-Once semantics.

Example

  1. @app.route('/dsstatus', methods=['POST'])
  2. def ds_subscriber():
  3. print(request.json, flush=True)
  4. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  1. app.post('/dsstatus', (req, res) => {
  2. res.sendStatus(200);
  3. });

(Optional) Step 5: Publishing a topic with code

If you prefer publishing a topic using code, here is an example.

  1. const express = require('express');
  2. const path = require('path');
  3. const request = require('request');
  4. const bodyParser = require('body-parser');
  5. const app = express();
  6. app.use(bodyParser.json());
  7. const daprPort = process.env.DAPR_HTTP_PORT || 3500;
  8. const daprUrl = `http://localhost:${daprPort}/v1.0`;
  9. const port = 8080;
  10. const pubsubName = 'pubsub';
  11. app.post('/publish', (req, res) => {
  12. console.log("Publishing: ", req.body);
  13. const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
  14. request( { uri: publishUrl, method: 'POST', json: req.body } );
  15. res.sendStatus(200);
  16. });
  17. app.listen(process.env.PORT || port, () => console.log(`Listening on port ${port}!`));

If you prefer publishing a topic using code, here is an example.

  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create();
  4. $app->run(function(\DI\FactoryInterface $factory, \Psr\Log\LoggerInterface $logger) {
  5. $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
  6. $publisher->topic('deathStarStatus')->publish('operational');
  7. $logger->alert('published!');
  8. });

You can save this to app2.php and while app1 is running in another terminal, execute:

  1. dapr --app-id app2 run -- php app2.php

Sending a custom CloudEvent

Dapr automatically takes the data sent on the publish request and wraps it in a CloudEvent 1.0 envelope. If you want to use your own custom CloudEvent, make sure to specify the content type as application/cloudevents+json.

Read about content types here, and about the Cloud Events message format.

Next steps

Last modified August 2, 2021 : Fix Java SDK link (#1695) (2c67fd1)