How-To: Route messages to different event handlers

Learn how to route messages from a topic to different event handlers based on CloudEvent fields

Preview feature

Pub/Sub message routing is currently in preview.

Introduction

Content-based routing is a messaging pattern that utilizes a DSL instead of imperative application code. PubSub routing is an implementation of this pattern that allows developers to use expressions to route CloudEvents based on their contents to different URIs/paths and event handlers in your application. If no route matches, then an optional default route is used. This becomes useful as your applications expands to support multiple event versions, or special cases. Routing can be implemented with code; however, keeping routing rules external from the application can improve portability.

This feature is available to both the declarative and programmatic subscription approaches.

Enable message routing

This is a preview feature. To enable it, add the PubSub.Routing feature entry to your application configuration like so:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Configuration
  3. metadata:
  4. name: pubsubroutingconfig
  5. spec:
  6. features:
  7. - name: PubSub.Routing
  8. enabled: true

Learn more about enabling preview features.

Declarative subscription

For declarative subscriptions, you must use dapr.io/v2alpha1 as the apiVersion. Here is an example of subscriptions.yaml using routing.

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: myevent-subscription
  5. spec:
  6. pubsubname: pubsub
  7. topic: inventory
  8. routes:
  9. rules:
  10. - match: event.type == "widget"
  11. path: /widgets
  12. - match: event.type == "gadget"
  13. path: /gadgets
  14. default: /products
  15. scopes:
  16. - app1
  17. - app2

Programmatic subscription

Alternatively, the programattic approach varies slightly in that the routes structure is returned instead of route. The JSON structure matches the declarative YAML.

  1. import flask
  2. from flask import request, jsonify
  3. from flask_cors import CORS
  4. import json
  5. import sys
  6. app = flask.Flask(__name__)
  7. CORS(app)
  8. @app.route('/dapr/subscribe', methods=['GET'])
  9. def subscribe():
  10. subscriptions = [
  11. {
  12. 'pubsubname': 'pubsub',
  13. 'topic': 'inventory',
  14. 'routes': {
  15. 'rules': [
  16. {
  17. 'match': 'event.type == "widget"',
  18. 'path': '/widgets'
  19. },
  20. {
  21. 'match': 'event.type == "gadget"',
  22. 'path': '/gadgets'
  23. },
  24. ],
  25. 'default': '/products'
  26. }
  27. }]
  28. return jsonify(subscriptions)
  29. @app.route('/products', methods=['POST'])
  30. def ds_subscriber():
  31. print(request.json, flush=True)
  32. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  33. app.run()
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "inventory",
  11. routes: {
  12. rules: [
  13. {
  14. match: 'event.type == "widget"',
  15. path: '/widgets'
  16. },
  17. {
  18. match: 'event.type == "gadget"',
  19. path: '/gadgets'
  20. },
  21. ],
  22. default: '/products'
  23. }
  24. }
  25. ]);
  26. })
  27. app.post('/products', (req, res) => {
  28. console.log(req.body);
  29. res.sendStatus(200);
  30. });
  31. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
  1. [Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
  2. [HttpPost("widgets")]
  3. public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
  4. {
  5. // Logic
  6. return stock;
  7. }
  8. [Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
  9. [HttpPost("gadgets")]
  10. public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
  11. {
  12. // Logic
  13. return stock;
  14. }
  15. [Topic("pubsub", "inventory")]
  16. [HttpPost("products")]
  17. public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
  18. {
  19. // Logic
  20. return stock;
  21. }
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "github.com/gorilla/mux"
  8. )
  9. const appPort = 3000
  10. type subscription struct {
  11. PubsubName string `json:"pubsubname"`
  12. Topic string `json:"topic"`
  13. Metadata map[string]string `json:"metadata,omitempty"`
  14. Routes routes `json:"routes"`
  15. }
  16. type routes struct {
  17. Rules []rule `json:"rules,omitempty"`
  18. Default string `json:"default,omitempty"`
  19. }
  20. type rule struct {
  21. Match string `json:"match"`
  22. Path string `json:"path"`
  23. }
  24. // This handles /dapr/subscribe
  25. func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
  26. t := []subscription{
  27. {
  28. PubsubName: "pubsub",
  29. Topic: "inventory",
  30. Routes: routes{
  31. Rules: []rule{
  32. {
  33. Match: `event.type == "widget"`,
  34. Path: "/widgets",
  35. },
  36. {
  37. Match: `event.type == "gadget"`,
  38. Path: "/gadgets",
  39. },
  40. },
  41. Default: "/products",
  42. },
  43. },
  44. }
  45. w.WriteHeader(http.StatusOK)
  46. json.NewEncoder(w).Encode(t)
  47. }
  48. func main() {
  49. router := mux.NewRouter().StrictSlash(true)
  50. router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
  51. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
  52. }
  1. <?php
  2. require_once __DIR__.'/vendor/autoload.php';
  3. $app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
  4. new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
  5. rules: => [
  6. ('match': 'event.type == "widget"', path: '/widgets'),
  7. ('match': 'event.type == "gadget"', path: '/gadgets'),
  8. ]
  9. default: '/products')),
  10. ]]));
  11. $app->post('/products', function(
  12. #[\Dapr\Attributes\FromBody]
  13. \Dapr\PubSub\CloudEvent $cloudEvent,
  14. \Psr\Log\LoggerInterface $logger
  15. ) {
  16. $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
  17. return ['status' => 'SUCCESS'];
  18. }
  19. );
  20. $app->start();

Common Expression Language (CEL)

In these examples, depending on the type of the event (event.type), the application will be called on /widgets, /gadgets or /products. The expressions are written as Common Expression Language (CEL) where event represents the cloud event. Any of the attributes from the CloudEvents core specification can be referenced in the expression.

Example expressions

Match “important” messages

  1. has(event.data.important) && event.data.important == true

Match deposits greater than $10000

  1. event.type == "deposit" && event.data.amount > 10000

Match multiple versions of a message

  1. event.type == "mymessage.v1"
  1. event.type == "mymessage.v2"

CloudEvent attributes

For reference, the following attributes are from the CloudEvents specification.

Event Data

data

As defined by the term Data, CloudEvents MAY include domain-specific information about the occurrence. When present, this information will be encapsulated within data.

  • Description: The event payload. This specification does not place any restriction on the type of this information. It is encoded into a media format which is specified by the datacontenttype attribute (e.g. application/json), and adheres to the dataschema format when those respective attributes are present.
  • Constraints:
    • OPTIONAL

Limitation

Currently, it is only possible to access the attributes inside data if it is nested JSON values and not JSON escaped in a string.

REQUIRED Attributes

The following attributes are REQUIRED to be present in all CloudEvents:

id

  • Type: String
  • Description: Identifies the event. Producers MUST ensure that source + id is unique for each distinct event. If a duplicate event is re-sent (e.g. due to a network error) it MAY have the same id. Consumers MAY assume that Events with identical source and id are duplicates.
  • Constraints:
    • REQUIRED
    • MUST be a non-empty string
    • MUST be unique within the scope of the producer
  • Examples:
    • An event counter maintained by the producer
    • A UUID

source

  • Type: URI-reference

  • Description: Identifies the context in which an event happened. Often this will include information such as the type of the event source, the organization publishing the event or the process that produced the event. The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.

    Producers MUST ensure that source + id is unique for each distinct event.

    An application MAY assign a unique source to each distinct producer, which makes it easy to produce unique IDs since no other producer will have the same source. The application MAY use UUIDs, URNs, DNS authorities or an application-specific scheme to create unique source identifiers.

    A source MAY include more than one producer. In that case the producers MUST collaborate to ensure that source + id is unique for each distinct event.

  • Constraints:

    • REQUIRED
    • MUST be a non-empty URI-reference
    • An absolute URI is RECOMMENDED
  • Examples

    • Internet-wide unique URI with a DNS authority.
    • Universally-unique URN with a UUID:
      • urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66
    • Application-specific identifiers
      • /cloudevents/spec/pull/123
      • /sensors/tn-1234567/alerts
      • 1-555-123-4567

specversion

  • Type: String

  • Description: The version of the CloudEvents specification which the event uses. This enables the interpretation of the context. Compliant event producers MUST use a value of 1.0 when referring to this version of the specification.

    Currently, this attribute will only have the ‘major’ and ‘minor’ version numbers included in it. This allows for ‘patch’ changes to the specification to be made without changing this property’s value in the serialization. Note: for ‘release candidate’ releases a suffix might be used for testing purposes.

  • Constraints:

    • REQUIRED
    • MUST be a non-empty string

type

  • Type: String
  • Description: This attribute contains a value describing the type of event related to the originating occurrence. Often this attribute is used for routing, observability, policy enforcement, etc. The format of this is producer defined and might include information such as the version of the type - see Versioning of CloudEvents in the Primer for more information.
  • Constraints:
    • REQUIRED
    • MUST be a non-empty string
    • SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines the semantics of this event type.
  • Examples
    • com.github.pull_request.opened
    • com.example.object.deleted.v2

OPTIONAL Attributes

The following attributes are OPTIONAL to appear in CloudEvents. See the Notational Conventions section for more information on the definition of OPTIONAL.

datacontenttype

  • Type: String per RFC 2046

  • Description: Content type of data value. This attribute enables data to carry any type of content, whereby format and encoding might differ from that of the chosen event format. For example, an event rendered using the JSON envelope format might carry an XML payload in data, and the consumer is informed by this attribute being set to “application/xml”. The rules for how data content is rendered for different datacontenttype values are defined in the event format specifications; for example, the JSON event format defines the relationship in section 3.1.

    For some binary mode protocol bindings, this field is directly mapped to the respective protocol’s content-type metadata property. Normative rules for the binary mode and the content-type metadata mapping can be found in the respective protocol.

    In some event formats the datacontenttype attribute MAY be omitted. For example, if a JSON format event has no datacontenttype attribute, then it is implied that the data is a JSON value conforming to the “application/json” media type. In other words: a JSON-format event with no datacontenttype is exactly equivalent to one with datacontenttype="application/json".

    When translating an event message with no datacontenttype attribute to a different format or protocol binding, the target datacontenttype SHOULD be set explicitly to the implied datacontenttype of the source.

  • Constraints:

    • OPTIONAL
    • If present, MUST adhere to the format specified in RFC 2046
  • For Media Type examples see IANA Media Types

dataschema

  • Type: URI
  • Description: Identifies the schema that data adheres to. Incompatible changes to the schema SHOULD be reflected by a different URI. See Versioning of CloudEvents in the Primer for more information.
  • Constraints:
    • OPTIONAL
    • If present, MUST be a non-empty URI

subject

  • Type: String

  • Description: This describes the subject of the event in the context of the event producer (identified by source). In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the source identifier alone might not be sufficient as a qualifier for any specific event if the source context has internal sub-structure.

    Identifying the subject of the event in context metadata (opposed to only in the data payload) is particularly helpful in generic subscription filtering scenarios where middleware is unable to interpret the data content. In the above example, the subscriber might only be interested in blobs with names ending with ‘.jpg’ or ‘.jpeg’ and the subject attribute allows for constructing a simple and efficient string-suffix filter for that subset of events.

  • Constraints:

    • OPTIONAL
    • If present, MUST be a non-empty string
  • Example:

    • A subscriber might register interest for when new blobs are created inside a blob-storage container. In this case, the event source identifies the subscription scope (storage container), the type identifies the “blob created” event, and the id uniquely identifies the event instance to distinguish separate occurrences of a same-named blob having been created; the name of the newly created blob is carried in subject:

time

  • Type: Timestamp
  • Description: Timestamp of when the occurrence happened. If the time of the occurrence cannot be determined then this attribute MAY be set to some other time (such as the current time) by the CloudEvents producer, however all producers for the same source MUST be consistent in this respect. In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.
  • Constraints:
    • OPTIONAL
    • If present, MUST adhere to the format specified in RFC 3339

Limitation

Currently, comparisons to time (e.g. before or after “now”) are not supported.

Community call demo

Watch this video on how to use message routing with pub/sub:

Next steps

Last modified February 18, 2022: Update setup-jetstream.md (#2200) (428d8c2)