How to: Publish a message and subscribe to a topic

Learn how to send messages to a topic with one service and subscribe to that topic in another service

Now that you’ve learned what the Dapr pub/sub building block provides, learn how it can work in your service. The below code example loosely describes an application that processes orders with two services, each with Dapr sidecars:

  • A checkout service using Dapr to subscribe to the topic in the message queue.
  • An order processing service using Dapr to publish a message to RabbitMQ.

Diagram showing state management of example service

Dapr automatically wraps the user payload in a CloudEvents v1.0 compliant envelope, using Content-Type header value for datacontenttype attribute. Learn more about messages with CloudEvents.

The following example demonstrates how your applications publish and subscribe to a topic called orders.

Note

If you haven’t already, try out the pub/sub quickstart for a quick walk-through on how to use pub/sub.

Set up the Pub/Sub component

The first step is to set up the pub/sub component:

When you run dapr init, Dapr creates a default Redis pubsub.yaml and runs a Redis container on your local machine, located:

  • On Windows, under %UserProfile%\.dapr\components\pubsub.yaml
  • On Linux/MacOS, under ~/.dapr/components/pubsub.yaml

With the pubsub.yaml component, you can easily swap out underlying components without application code changes. In this example, RabbitMQ is used.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "amqp://localhost:5672"
  11. - name: durable
  12. value: "false"
  13. - name: deletedWhenUnused
  14. value: "false"
  15. - name: autoAck
  16. value: "false"
  17. - name: reconnectWait
  18. value: "0"
  19. - name: concurrency
  20. value: parallel
  21. scopes:
  22. - orderprocessing
  23. - checkout

You can override this file with another pubsub component by creating a components directory (in this example, myComponents) containing the file and using the flag --resources-path with the dapr run CLI command.

  1. dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
  1. dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
  1. dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
  1. dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
  1. dapr run --app-id myapp --resources-path ./myComponents -- npm start

To deploy this into a Kubernetes cluster, fill in the metadata connection details of the pub/sub component in the YAML below, save as pubsub.yaml, and run kubectl apply -f pubsub.yaml.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. type: pubsub.rabbitmq
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "amqp://localhost:5672"
  11. - name: protocol
  12. value: amqp
  13. - name: hostname
  14. value: localhost
  15. - name: username
  16. value: username
  17. - name: password
  18. value: password
  19. - name: durable
  20. value: "false"
  21. - name: deletedWhenUnused
  22. value: "false"
  23. - name: autoAck
  24. value: "false"
  25. - name: reconnectWait
  26. value: "0"
  27. - name: concurrency
  28. value: parallel
  29. scopes:
  30. - orderprocessing
  31. - checkout

Subscribe to topics

Dapr provides two methods by which you can subscribe to topics:

  • Declaratively, where subscriptions are defined in an external file.
  • Programmatically, where subscriptions are defined in user code.

Learn more in the declarative and programmatic subscriptions doc. This example demonstrates a declarative subscription.

Create a file named subscription.yaml and paste the following:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order-pub-sub
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: order-pub-sub
  10. scopes:
  11. - orderprocessing
  12. - checkout

The example above shows an event subscription to topic orders, for the pubsub component order-pub-sub.

  • The route field tells Dapr to send all topic messages to the /checkout endpoint in the app.
  • The scopes field enables this subscription for apps with IDs orderprocessing and checkout.

Place subscription.yaml in the same directory as your pubsub.yaml component. When Dapr starts up, it loads subscriptions along with the components.

Below are code examples that leverage Dapr SDKs to subscribe to the topic you defined in subscription.yaml.

  1. //dependencies
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System;
  5. using Microsoft.AspNetCore.Mvc;
  6. using Dapr;
  7. using Dapr.Client;
  8. //code
  9. namespace CheckoutService.controller
  10. {
  11. [ApiController]
  12. public class CheckoutServiceController : Controller
  13. {
  14. //Subscribe to a topic
  15. [Topic("order-pub-sub", "orders")]
  16. [HttpPost("checkout")]
  17. public void getCheckout([FromBody] int orderId)
  18. {
  19. Console.WriteLine("Subscriber received : " + orderId);
  20. }
  21. }
  22. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
  1. //dependencies
  2. import io.dapr.Topic;
  3. import io.dapr.client.domain.CloudEvent;
  4. import org.springframework.web.bind.annotation.*;
  5. import com.fasterxml.jackson.databind.ObjectMapper;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import reactor.core.publisher.Mono;
  9. //code
  10. @RestController
  11. public class CheckoutServiceController {
  12. private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
  13. //Subscribe to a topic
  14. @Topic(name = "orders", pubsubName = "order-pub-sub")
  15. @PostMapping(path = "/checkout")
  16. public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  17. return Mono.fromRunnable(() -> {
  18. try {
  19. log.info("Subscriber received: " + cloudEvent.getData());
  20. } catch (Exception e) {
  21. throw new RuntimeException(e);
  22. }
  23. });
  24. }
  25. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
  1. #dependencies
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. import logging
  5. import json
  6. #code
  7. app = App()
  8. logging.basicConfig(level = logging.INFO)
  9. #Subscribe to a topic
  10. @app.subscribe(pubsub_name='order-pub-sub', topic='orders')
  11. def mytopic(event: v1.Event) -> None:
  12. data = json.loads(event.Data())
  13. logging.info('Subscriber received: ' + str(data))
  14. app.run(6002)

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
  1. //dependencies
  2. import (
  3. "log"
  4. "net/http"
  5. "context"
  6. "github.com/dapr/go-sdk/service/common"
  7. daprd "github.com/dapr/go-sdk/service/http"
  8. )
  9. //code
  10. var sub = &common.Subscription{
  11. PubsubName: "order-pub-sub",
  12. Topic: "orders",
  13. Route: "/checkout",
  14. }
  15. func main() {
  16. s := daprd.NewService(":6002")
  17. //Subscribe to a topic
  18. if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
  19. log.Fatalf("error adding topic subscription: %v", err)
  20. }
  21. if err := s.Start(); err != nil && err != http.ErrServerClosed {
  22. log.Fatalf("error listenning: %v", err)
  23. }
  24. }
  25. func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
  26. log.Printf("Subscriber received: %s", e.Data)
  27. return false, nil
  28. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
  1. //dependencies
  2. import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. const serverHost = "127.0.0.1";
  6. const serverPort = "6002";
  7. start().catch((e) => {
  8. console.error(e);
  9. process.exit(1);
  10. });
  11. async function start(orderId) {
  12. const server = new DaprServer({
  13. serverHost,
  14. serverPort,
  15. communicationProtocol: CommunicationProtocolEnum.HTTP,
  16. clientOptions: {
  17. daprHost,
  18. daprPort: process.env.DAPR_HTTP_PORT,
  19. },
  20. });
  21. //Subscribe to a topic
  22. await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
  23. console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
  24. });
  25. await server.start();
  26. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

  1. dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

Publish a message

Start an instance of Dapr with an app-id called orderprocessing:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

Then publish a message to the orders topic:

  1. dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
  1. curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

Below are code examples that leverage Dapr SDKs to publish a topic.

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. //code
  11. namespace EventService
  12. {
  13. class Program
  14. {
  15. static async Task Main(string[] args)
  16. {
  17. string PUBSUB_NAME = "order-pub-sub";
  18. string TOPIC_NAME = "orders";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.Next(1,1000);
  23. CancellationTokenSource source = new CancellationTokenSource();
  24. CancellationToken cancellationToken = source.Token;
  25. using var client = new DaprClientBuilder().Build();
  26. //Using Dapr SDK to publish a topic
  27. await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
  28. Console.WriteLine("Published data: " + orderId);
  29. }
  30. }
  31. }
  32. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.Metadata;
  5. import static java.util.Collections.singletonMap;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Random;
  10. import java.util.concurrent.TimeUnit;
  11. //code
  12. @SpringBootApplication
  13. public class OrderProcessingServiceApplication {
  14. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  15. public static void main(String[] args) throws InterruptedException{
  16. String MESSAGE_TTL_IN_SECONDS = "1000";
  17. String TOPIC_NAME = "orders";
  18. String PUBSUB_NAME = "order-pub-sub";
  19. while(true) {
  20. TimeUnit.MILLISECONDS.sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.nextInt(1000-1) + 1;
  23. DaprClient client = new DaprClientBuilder().build();
  24. //Using Dapr SDK to publish a topic
  25. client.publishEvent(
  26. PUBSUB_NAME,
  27. TOPIC_NAME,
  28. orderId,
  29. singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
  30. log.info("Published data:" + orderId);
  31. }
  32. }
  33. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. import json
  7. from dapr.clients import DaprClient
  8. #code
  9. logging.basicConfig(level = logging.INFO)
  10. while True:
  11. sleep(random.randrange(50, 5000) / 1000)
  12. orderId = random.randint(1, 1000)
  13. PUBSUB_NAME = 'order-pub-sub'
  14. TOPIC_NAME = 'orders'
  15. with DaprClient() as client:
  16. #Using Dapr SDK to publish a topic
  17. result = client.publish_event(
  18. pubsub_name=PUBSUB_NAME,
  19. topic_name=TOPIC_NAME,
  20. data=json.dumps(orderId),
  21. data_content_type='application/json',
  22. )
  23. logging.info('Published data: ' + str(orderId))

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
  1. //dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "time"
  7. "strconv"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. //code
  11. var (
  12. PUBSUB_NAME = "order-pub-sub"
  13. TOPIC_NAME = "orders"
  14. )
  15. func main() {
  16. for i := 0; i < 10; i++ {
  17. time.Sleep(5000)
  18. orderId := rand.Intn(1000-1) + 1
  19. client, err := dapr.NewClient()
  20. if err != nil {
  21. panic(err)
  22. }
  23. defer client.Close()
  24. ctx := context.Background()
  25. //Using Dapr SDK to publish a topic
  26. if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
  27. err != nil {
  28. panic(err)
  29. }
  30. log.Println("Published data: " + strconv.Itoa(orderId))
  31. }
  32. }

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
  1. //dependencies
  2. import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr';
  3. const daprHost = "127.0.0.1";
  4. var main = function() {
  5. for(var i=0;i<10;i++) {
  6. sleep(5000);
  7. var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  8. start(orderId).catch((e) => {
  9. console.error(e);
  10. process.exit(1);
  11. });
  12. }
  13. }
  14. async function start(orderId) {
  15. const PUBSUB_NAME = "order-pub-sub"
  16. const TOPIC_NAME = "orders"
  17. const client = new DaprClient({
  18. daprHost,
  19. daprPort: process.env.DAPR_HTTP_PORT,
  20. communicationProtocol: CommunicationProtocolEnum.HTTP
  21. });
  22. console.log("Published data:" + orderId)
  23. //Using Dapr SDK to publish a topic
  24. await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
  25. }
  26. function sleep(ms) {
  27. return new Promise(resolve => setTimeout(resolve, ms));
  28. }
  29. main();

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

Message acknowledgement and retries

In order to tell Dapr that a message was processed successfully, return a 200 OK response. If Dapr receives any other return status code than 200, or if your app crashes, Dapr will attempt to redeliver the message following at-least-once semantics.

Demo video

Watch this demo video to learn more about pub/sub messaging with Dapr.

Next steps

Last modified March 21, 2024: Merge pull request #4082 from newbe36524/v1.13 (f4b0938)