MQTT

Detailed documentation on the MQTT pubsub component

Component format

To set up MQTT pub/sub, create a component of type pubsub.mqtt. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt
  7. version: v1
  8. metadata:
  9. - name: url
  10. value: "tcp://[username][:password]@host.domain[:port]"
  11. - name: qos
  12. value: 1
  13. - name: retain
  14. value: "false"
  15. - name: cleanSession
  16. value: "false"
  17. - name: consumerID
  18. value: "channel1"

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsExample
urlYAddress of the MQTT broker. Can be secretKeyRef to use a secret reference.
Use the tcp:// URI scheme for non-TLS communication.
Use the ssl:// URI scheme for TLS communication.
“tcp://[username][:password]@host.domain[:port]”
consumerIDNThe client ID used to connect to the MQTT broker for the consumer connection. Defaults to the Dapr app ID.
Note: if producerID is not set, -consumer is appended to this value for the consumer connection
Can be set to string value (such as “channel1” in the example above) or string format value (such as “{podName}”, etc.). See all of template tags you can use in your component metadata.
producerIDNThe client ID used to connect to the MQTT broker for the producer connection. Defaults to {consumerID}-producer.“myMqttProducerApp”
qosNIndicates the Quality of Service Level (QoS) of the message (more info). Defaults to 1.0, 1, 2
retainNDefines whether the message is saved by the broker as the last known good value for a specified topic. Defaults to “false”.“true”, “false”
cleanSessionNSets the clean_session flag in the connection message to the MQTT broker if “true” (more info). Defaults to “false”.“true”, “false”
caCertRequired for using TLSCertificate Authority (CA) certificate in PEM format for verifying server TLS certificates.“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientCertRequired for using TLSTLS client certificate in PEM format. Must be used with clientKey.“——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“
clientKeyRequired for using TLSTLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference.“——-BEGIN RSA PRIVATE KEY——-\n<base64-encoded PKCS8>\n——-END RSA PRIVATE KEY——-“

Enabling message delivery retries

The MQTT pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. If the service marks the message as not processed, the message won’t be acknowledged back to the broker. Only if broker resends the message, would it would be retried.

To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the MQTT pub/sub component.

There is a crucial difference between the two ways of retries:

  1. Re-delivery of unacknowledged messages is completely dependent on the broker. Dapr does not guarantee it. Some brokers like emqx, vernemq etc. support it but it not a part of MQTT3 spec.

  2. Using a retry resiliency policy makes the same Dapr sidecar retry redelivering the messages. So it is the same Dapr sidecar and the same app receiving the same message.

Communication using TLS

To configure communication using TLS, ensure that the MQTT broker (for example, mosquitto) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt
  7. version: v1
  8. metadata:
  9. - name: url
  10. value: "ssl://host.domain[:port]"
  11. - name: qos
  12. value: 1
  13. - name: retain
  14. value: "false"
  15. - name: cleanSession
  16. value: "false"
  17. - name: caCert
  18. value: ${{ myLoadedCACert }}
  19. - name: clientCert
  20. value: ${{ myLoadedClientCert }}
  21. - name: clientKey
  22. secretKeyRef:
  23. name: myMqttClientKey
  24. key: myMqttClientKey
  25. auth:
  26. secretStore: <SECRET_STORE_NAME>

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Consuming a shared topic

When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag, which will give each instance a randomly generated consumerID value on start up. For example:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. spec:
  6. type: pubsub.mqtt
  7. version: v1
  8. metadata:
  9. - name: consumerID
  10. value: "{uuid}"
  11. - name: url
  12. value: "tcp://admin:public@localhost:1883"
  13. - name: qos
  14. value: 1
  15. - name: retain
  16. value: "false"
  17. - name: cleanSession
  18. value: "true"

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Note that in the case, the value of the consumer ID is random every time Dapr restarts, so we are setting cleanSession to true as well.

Create a MQTT broker

You can run a MQTT broker locally using Docker:

  1. docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6

You can then interact with the server using the client port: mqtt://localhost:1883

You can run a MQTT broker in kubernetes using following yaml:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: mqtt-broker
  5. labels:
  6. app-name: mqtt-broker
  7. spec:
  8. replicas: 1
  9. selector:
  10. matchLabels:
  11. app-name: mqtt-broker
  12. template:
  13. metadata:
  14. labels:
  15. app-name: mqtt-broker
  16. spec:
  17. containers:
  18. - name: mqtt
  19. image: eclipse-mosquitto:1.6
  20. imagePullPolicy: IfNotPresent
  21. ports:
  22. - name: default
  23. containerPort: 1883
  24. protocol: TCP
  25. - name: websocket
  26. containerPort: 9001
  27. protocol: TCP
  28. ---
  29. apiVersion: v1
  30. kind: Service
  31. metadata:
  32. name: mqtt-broker
  33. labels:
  34. app-name: mqtt-broker
  35. spec:
  36. type: ClusterIP
  37. selector:
  38. app-name: mqtt-broker
  39. ports:
  40. - port: 1883
  41. targetPort: default
  42. name: default
  43. protocol: TCP
  44. - port: 9001
  45. targetPort: websocket
  46. name: websocket
  47. protocol: TCP

You can then interact with the server using the client port: tcp://mqtt-broker.default.svc.cluster.local:1883

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)