MQTT

Detailed documentation on the MQTT pubsub component

Component format

To setup MQTT pubsub create a component of type pubsub.mqtt. See this guide on how to create and apply a pubsub configuration

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: url
  11. value: "tcp://[username][:password]@host.domain[:port]"
  12. - name: qos
  13. value: 1
  14. - name: retain
  15. value: "false"
  16. - name: cleanSession
  17. value: "false"

Spec metadata fields

FieldRequiredDetailsExample
urlYAddress of the MQTT brokerUse tcp:// scheme for non-TLS communication. Usetcps:// scheme for TLS communication.
“tcp://[username][:password]@host.domain[:port]”
qosNIndicates the Quality of Service Level (QoS) of the message. Default 01
retainNDefines whether the message is saved by the broker as the last known good value for a specified topic. Default “false”“true”, “false”
cleanSessionNwill set the “clean session” in the connect message when client connects to an MQTT broker. Default “true”“true”, “false”
caCertRequired for using TLSCertificate authority certificate. Can be secretKeyRef to use a secret reference0123456789-0123456789
clientCertRequired for using TLSClient certificate. Can be secretKeyRef to use a secret reference0123456789-0123456789
clientKeyRequired for using TLSClient key. Can be secretKeyRef to use a secret reference012345
backOffMaxRetriesNThe maximum number of retries to process the message before returning an error. Defaults to “0” which means the component will not retry processing the message. “-1” will retry indefinitely until the message is processed or the application is shutdown. And positive number is treated as the maximum retry count. The component will wait 5 seconds between retries.“3”

Communication using TLS

To configure communication using TLS, ensure mosquitto broker is configured to support certificates. Pre-requisite includes certficate authority certificate, ca issued client certificate, client private key. Here is an example.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: url
  11. value: "tcps://host.domain[:port]"
  12. - name: qos
  13. value: 1
  14. - name: retain
  15. value: "false"
  16. - name: cleanSession
  17. value: "false"
  18. - name: caCert
  19. value: ''
  20. - name: clientCert
  21. value: ''
  22. - name: clientKey
  23. value: ''

Consuming a shared topic

When consuming a shared topic, each consumer must have a unique identifier. By default, the application Id is used to uniquely identify each consumer and publisher. In self-hosted mode, running each Dapr run with a different application Id is sufficient to have them consume from the same shared topic. However on Kubernetes, a pod with multiple application instances shares the same application Id, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s ConsumerID metadata with a {uuid} tag, making each instance to have a randomly generated ConsumerID value on start up. For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: consumerID
  11. value: "{uuid}"
  12. - name: url
  13. value: "tcp://admin:public@localhost:1883"
  14. - name: qos
  15. value: 1
  16. - name: retain
  17. value: "false"
  18. - name: cleanSession
  19. value: "false"

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Create a MQTT broker

You can run a MQTT broker locally using Docker:

  1. docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6.9

You can then interact with the server using the client port: mqtt://localhost:1883

You can run a MQTT broker in kubernetes using following yaml:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: mqtt-broker
  5. labels:
  6. app-name: mqtt-broker
  7. spec:
  8. replicas: 1
  9. selector:
  10. matchLabels:
  11. app-name: mqtt-broker
  12. template:
  13. metadata:
  14. labels:
  15. app-name: mqtt-broker
  16. spec:
  17. containers:
  18. - name: mqtt
  19. image: eclipse-mosquitto:1.6.9
  20. imagePullPolicy: IfNotPresent
  21. ports:
  22. - name: default
  23. containerPort: 1883
  24. protocol: TCP
  25. - name: websocket
  26. containerPort: 9001
  27. protocol: TCP
  28. ---
  29. apiVersion: v1
  30. kind: Service
  31. metadata:
  32. name: mqtt-broker
  33. labels:
  34. app-name: mqtt-broker
  35. spec:
  36. type: ClusterIP
  37. selector:
  38. app-name: mqtt-broker
  39. ports:
  40. - port: 1883
  41. targetPort: default
  42. name: default
  43. protocol: TCP
  44. - port: 9001
  45. targetPort: websocket
  46. name: websocket
  47. protocol: TCP

You can then interact with the server using the client port: tcp://mqtt-broker.default.svc.cluster.local:1883

Last modified August 2, 2021 : Fix Java SDK link (#1695) (2c67fd1)