How-To: Publish a message and subscribe to a topic

Learn how to send messages to a topic with one service and subscribe to that topic in another service

Introduction

Pub/Sub is a common pattern in a distributed system with many services that want to utilize decoupled, asynchronous messaging. Using Pub/Sub, you can enable scenarios where event consumers are decoupled from event producers.

Dapr provides an extensible Pub/Sub system with At-Least-Once guarantees, allowing developers to publish and subscribe to topics. Dapr provides components for pub/sub, that enable operators to use their preferred infrastructure, for example Redis Streams, Kafka, etc.

Content Types

When publishing a message, it’s important to specify the content type of the data being sent. Unless specified, Dapr will assume text/plain. When using Dapr’s HTTP API, the content type can be set in a Content-Type header. gRPC clients and SDKs have a dedicated content type parameter.

Example:

The below code example loosely describes an application that processes orders. In the example, there are two services - an order processing service and a checkout service. Both services have Dapr sidecars. The order processing service uses Dapr to publish a message to RabbitMQ and the checkout service subscribes to the topic in the message queue.

Diagram showing state management of example service

Step 1: Setup the Pub/Sub component

The following example creates applications to publish and subscribe to a topic called orders.

The first step is to setup the Pub/Sub component:

The pubsub.yaml is created by default on your local machine when running dapr init. Verify by opening your components file under %UserProfile%\.dapr\components\pubsub.yaml on Windows or ~/.dapr/components/pubsub.yaml on Linux/MacOS.

In this example, RabbitMQ is used for publish and subscribe. Replace pubsub.yaml file contents with the below contents.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order_pub_sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "amqp://localhost:5672"
  11. - name: durable
  12. value: "false"
  13. - name: deletedWhenUnused
  14. value: "false"
  15. - name: autoAck
  16. value: "false"
  17. - name: reconnectWait
  18. value: "0"
  19. - name: concurrency
  20. value: parallel
  21. scopes:
  22. - orderprocessing
  23. - checkout

You can override this file with another Redis instance or another pubsub component by creating a components directory containing the file and using the flag --components-path with the dapr run CLI command.

To deploy this into a Kubernetes cluster, fill in the metadata connection details of your desired pubsub component in the yaml below, save as pubsub.yaml, and run kubectl apply -f pubsub.yaml.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order_pub_sub
  5. namespace: default
  6. spec:
  7. type: pubsub.rabbitmq
  8. version: v1
  9. metadata:
  10. - name: host
  11. value: "amqp://localhost:5672"
  12. - name: durable
  13. value: "false"
  14. - name: deletedWhenUnused
  15. value: "false"
  16. - name: autoAck
  17. value: "false"
  18. - name: reconnectWait
  19. value: "0"
  20. - name: concurrency
  21. value: parallel
  22. scopes:
  23. - orderprocessing
  24. - checkout

Step 2: Subscribe to topics

Dapr allows two methods by which you can subscribe to topics:

  • Declaratively, where subscriptions are defined in an external file.
  • Programmatically, where subscriptions are defined in user code.

Note

Both declarative and programmatic approaches support the same features. The declarative approach removes the Dapr dependency from your code and allows, for example, existing applications to subscribe to topics, without having to change code. The programmatic approach implements the subscription in your code.

Declarative subscriptions

You can subscribe to a topic using the following Custom Resources Definition (CRD). Create a file named subscription.yaml and paste the following:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order_pub_sub
  5. spec:
  6. topic: orders
  7. route: /checkout
  8. pubsubname: order_pub_sub
  9. scopes:
  10. - orderprocessing
  11. - checkout

The example above shows an event subscription to topic orders, for the pubsub component order_pub_sub.

  • The route field tells Dapr to send all topic messages to the /checkout endpoint in the app.
  • The scopes field enables this subscription for apps with IDs orderprocessing and checkout.

Set the component with:

Place the CRD in your ./components directory. When Dapr starts up, it loads subscriptions along with components.

Note: By default, Dapr loads components from $HOME/.dapr/components on MacOS/Linux and %USERPROFILE%\.dapr\components on Windows.

You can also override the default directory by pointing the Dapr CLI to a components path:

  1. dapr run --app-id myapp --components-path ./myComponents -- dotnet run
  1. dapr run --app-id myapp --components-path ./myComponents -- mvn spring-boot:run
  1. dapr run --app-id myapp --components-path ./myComponents -- python3 app.py
  1. dapr run --app-id myapp --components-path ./myComponents -- go run app.go
  1. dapr run --app-id myapp --components-path ./myComponents -- npm start

In Kubernetes, save the CRD to a file and apply it to the cluster:

  1. kubectl apply -f subscription.yaml

Below are code examples that leverage Dapr SDKs to subscribe to a topic.

  1. //dependencies
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System;
  5. using Microsoft.AspNetCore.Mvc;
  6. using Dapr;
  7. using Dapr.Client;
  8. //code
  9. namespace CheckoutService.controller
  10. {
  11. [ApiController]
  12. public class CheckoutServiceController : Controller
  13. {
  14. //Subscribe to a topic
  15. [Topic("order_pub_sub", "orders")]
  16. [HttpPost("checkout")]
  17. public void getCheckout([FromBody] int orderId)
  18. {
  19. Console.WriteLine("Subscriber received : " + orderId);
  20. }
  21. }
  22. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-ssl dotnet run
  1. //dependencies
  2. import io.dapr.Topic;
  3. import io.dapr.client.domain.CloudEvent;
  4. import org.springframework.web.bind.annotation.*;
  5. import com.fasterxml.jackson.databind.ObjectMapper;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import reactor.core.publisher.Mono;
  9. //code
  10. @RestController
  11. public class CheckoutServiceController {
  12. private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
  13. //Subscribe to a topic
  14. @Topic(name = "orders", pubsubName = "order_pub_sub")
  15. @PostMapping(path = "/checkout")
  16. public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  17. return Mono.fromRunnable(() -> {
  18. try {
  19. log.info("Subscriber received: " + cloudEvent.getData());
  20. } catch (Exception e) {
  21. throw new RuntimeException(e);
  22. }
  23. });
  24. }
  25. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
  1. #dependencies
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. import logging
  5. import json
  6. #code
  7. app = App()
  8. logging.basicConfig(level = logging.INFO)
  9. #Subscribe to a topic
  10. @app.subscribe(pubsub_name='order_pub_sub', topic='orders')
  11. def mytopic(event: v1.Event) -> None:
  12. data = json.loads(event.Data())
  13. logging.info('Subscriber received: ' + str(data))
  14. app.run(6002)

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
  1. //dependencies
  2. import (
  3. "log"
  4. "net/http"
  5. "context"
  6. "github.com/dapr/go-sdk/service/common"
  7. daprd "github.com/dapr/go-sdk/service/http"
  8. )
  9. //code
  10. var sub = &common.Subscription{
  11. PubsubName: "order_pub_sub",
  12. Topic: "orders",
  13. Route: "/checkout",
  14. }
  15. func main() {
  16. s := daprd.NewService(":6002")
  17. //Subscribe to a topic
  18. if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
  19. log.Fatalf("error adding topic subscription: %v", err)
  20. }
  21. if err := s.Start(); err != nil && err != http.ErrServerClosed {
  22. log.Fatalf("error listenning: %v", err)
  23. }
  24. }
  25. func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
  26. log.Printf("Subscriber received: %s", e.Data)
  27. return false, nil
  28. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
  1. //dependencies
  2. import { DaprServer, CommunicationProtocolEnum } from 'dapr-client';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. const serverHost = "127.0.0.1";
  6. const serverPort = "6002";
  7. start().catch((e) => {
  8. console.error(e);
  9. process.exit(1);
  10. });
  11. async function start(orderId) {
  12. const server = new DaprServer(
  13. serverHost,
  14. serverPort,
  15. daprHost,
  16. process.env.DAPR_HTTP_PORT,
  17. CommunicationProtocolEnum.HTTP
  18. );
  19. //Subscribe to a topic
  20. await server.pubsub.subscribe("order_pub_sub", "orders", async (orderId) => {
  21. console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
  22. });
  23. await server.startServer();
  24. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

The /checkout endpoint matches the route defined in the subscriptions and this is where Dapr will send all topic messages to.

Step 3: Publish a topic

Start an instance of Dapr with an app-id called orderprocessing:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

Then publish a message to the orders topic:

  1. dapr publish --publish-app-id orderprocessing --pubsub order_pub_sub --topic orders --data '{"orderId": "100"}'

Then publish a message to the orders topic:

  1. curl -X POST http://localhost:3601/v1.0/publish/order_pub_sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'

Then publish a message to the orders topic:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order_pub_sub/orders'

Dapr automatically wraps the user payload in a Cloud Events v1.0 compliant envelope, using Content-Type header value for datacontenttype attribute.

Below are code examples that leverage Dapr SDKs to publish a topic.

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. //code
  11. namespace EventService
  12. {
  13. class Program
  14. {
  15. static async Task Main(string[] args)
  16. {
  17. string PUBSUB_NAME = "order_pub_sub";
  18. string TOPIC_NAME = "orders";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.Next(1,1000);
  23. CancellationTokenSource source = new CancellationTokenSource();
  24. CancellationToken cancellationToken = source.Token;
  25. using var client = new DaprClientBuilder().Build();
  26. //Using Dapr SDK to publish a topic
  27. await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
  28. Console.WriteLine("Published data: " + orderId);
  29. }
  30. }
  31. }
  32. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-ssl dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.Metadata;
  5. import static java.util.Collections.singletonMap;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Random;
  10. import java.util.concurrent.TimeUnit;
  11. //code
  12. @SpringBootApplication
  13. public class OrderProcessingServiceApplication {
  14. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  15. public static void main(String[] args) throws InterruptedException{
  16. String MESSAGE_TTL_IN_SECONDS = "1000";
  17. String TOPIC_NAME = "orders";
  18. String PUBSUB_NAME = "order_pub_sub";
  19. while(true) {
  20. TimeUnit.MILLISECONDS.sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.nextInt(1000-1) + 1;
  23. DaprClient client = new DaprClientBuilder().build();
  24. //Using Dapr SDK to publish a topic
  25. client.publishEvent(
  26. PUBSUB_NAME,
  27. TOPIC_NAME,
  28. orderId,
  29. singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
  30. log.info("Published data:" + orderId);
  31. }
  32. }
  33. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. import json
  7. from dapr.clients import DaprClient
  8. #code
  9. logging.basicConfig(level = logging.INFO)
  10. while True:
  11. sleep(random.randrange(50, 5000) / 1000)
  12. orderId = random.randint(1, 1000)
  13. PUBSUB_NAME = 'order_pub_sub'
  14. TOPIC_NAME = 'orders'
  15. with DaprClient() as client:
  16. #Using Dapr SDK to publish a topic
  17. result = client.publish_event(
  18. pubsub_name=PUBSUB_NAME,
  19. topic_name=TOPIC_NAME,
  20. data=json.dumps(orderId),
  21. data_content_type='application/json',
  22. )
  23. logging.info('Published data: ' + str(orderId))

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
  1. //dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "strconv"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. //code
  11. var (
  12. PUBSUB_NAME = "order_pub_sub"
  13. TOPIC_NAME = "orders"
  14. )
  15. func main() {
  16. for i := 0; i < 10; i++ {
  17. time.Sleep(5000)
  18. orderId := rand.Intn(1000-1) + 1
  19. client, err := dapr.NewClient()
  20. if err != nil {
  21. panic(err)
  22. }
  23. defer client.Close()
  24. ctx := context.Background()
  25. //Using Dapr SDK to publish a topic
  26. if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
  27. err != nil {
  28. panic(err)
  29. }
  30. log.Println("Published data: " + strconv.Itoa(orderId))
  31. }
  32. }

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
  1. //dependencies
  2. import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client';
  3. const daprHost = "127.0.0.1";
  4. var main = function() {
  5. for(var i=0;i<10;i++) {
  6. sleep(5000);
  7. var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  8. start(orderId).catch((e) => {
  9. console.error(e);
  10. process.exit(1);
  11. });
  12. }
  13. }
  14. async function start(orderId) {
  15. const PUBSUB_NAME = "order_pub_sub"
  16. const TOPIC_NAME = "orders"
  17. const client = new DaprClient(daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.HTTP);
  18. console.log("Published data:" + orderId)
  19. //Using Dapr SDK to publish a topic
  20. await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
  21. }
  22. function sleep(ms) {
  23. return new Promise(resolve => setTimeout(resolve, ms));
  24. }
  25. main();

Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

Step 4: ACK-ing a message

In order to tell Dapr that a message was processed successfully, return a 200 OK response. If Dapr receives any other return status code than 200, or if your app crashes, Dapr will attempt to redeliver the message following at-least-once semantics.

Sending a custom CloudEvent

Dapr automatically takes the data sent on the publish request and wraps it in a CloudEvent 1.0 envelope. If you want to use your own custom CloudEvent, make sure to specify the content type as application/cloudevents+json.

Read about content types here, and about the Cloud Events message format.

Example

Publish a custom CloudEvent to the orders topic:

  1. dapr publish --publish-app-id orderprocessing --pubsub order_pub_sub --topic orders --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

Publish a custom CloudEvent to the orders topic:

  1. curl -X POST http://localhost:3601/v1.0/publish/order_pub_sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

Publish a custom CloudEvent to the orders topic:

  1. Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order_pub_sub/orders'

Next steps

Last modified February 18, 2022: Update setup-jetstream.md (#2200) (428d8c2)