MQTT

关于MQTT pubsub组件的详细文档

配置

要安装MQTT pubsub,请创建一个类型为pubsub.mqtt的组件。 请参阅 本指南,了解如何创建和应用 pubsub 配置。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: url
  11. value: "tcp://[username][:password]@host.domain[:port]"
  12. - name: qos
  13. value: 1
  14. - name: retain
  15. value: "false"
  16. - name: cleanSession
  17. value: "false"

元数据字段规范

字段必填详情示例
urlYMQTT broker地址非TLS通信: tcp://, TLS通信:tcps://。 TLS通信:tcps://
“tcp://[username][:password]@host.domain[:port]”
qosN表示消息的服务质量等级(QoS), 默认值 0 默认值 01
retainN定义消息是否被broker保存为指定主题的最后已知有效值 默认值为 “false” 默认值为 “false”“true”, “false”
cleanSessionN将在客户端连接到MQTT broker时,在连接消息中设置 “clean session” 默认: “true” 默认: “true”“true”, “false”
caCert使用TLS时需要授权, 可以用secretKeyRef来引用密钥。0123456789-0123456789
clientCert使用TLS时需要客户端证书, 可以用secretKeyRef来引用密钥。0123456789-0123456789
clientKey使用TLS时需要客户端键, 可以用secretKeyRef来引用密钥。012345
backOffMaxRetriesNThe maximum number of retries to process the message before returning an error. Defaults to “0” which means the component will not retry processing the message. “-1” will retry indefinitely until the message is processed or the application is shutdown. And positive number is treated as the maximum retry count. The component will wait 5 seconds between retries.“3”

使用 TLS 通信

要配置使用 TLS 通信,需配置并确保mosquitto broker支持凭证。 前提条件包括certficate authority certificateca issued client certificateclient private key。 参见下面的示例。

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mqtt-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: url
  11. value: "tcps://host.domain[:port]"
  12. - name: qos
  13. value: 1
  14. - name: retain
  15. value: "false"
  16. - name: cleanSession
  17. value: "false"
  18. - name: caCert
  19. value: ''
  20. - name: clientCert
  21. value: ''
  22. - name: clientKey
  23. value: ''

消费共享主题

当消费一个共享主题时,每个消费者必须有一个唯一的标识符。 默认情况下,应用ID用于唯一标识每个消费者和发布者。 在自托管模式下,用不同的应用程序Id运行每个Dapr运行就足以让它们从同一个共享主题消费。 然而在Kubernetes上,一个有多个应用实例的pod共享同一个应用Id,这阻碍了所有实例消费同一个主题。 为了克服这个问题,请用{uuid}标签配置组件的ConsumerID元数据,使每个实例在启动时有一个随机生成的ConsumerID值。 例如:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: messagebus
  5. namespace: default
  6. spec:
  7. type: pubsub.mqtt
  8. version: v1
  9. metadata:
  10. - name: consumerID
  11. value: "{uuid}"
  12. - name: url
  13. value: "tcp://admin:public@localhost:1883"
  14. - name: qos
  15. value: 1
  16. - name: retain
  17. value: "false"
  18. - name: cleanSession
  19. value: "false"

Warning

以上示例将 Secret 明文存储。 更推荐的方式是使用 Secret 组件, 这里

创建一个 MQTT broker

你可以使用Docker本地运行MQTT broker:

  1. docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6.9

然后你可以通过mqtt://localhost:1883与服务器交互

你可以使用下面的yaml在kubernetes中运行一个MQTT broker:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: mqtt-broker
  5. labels:
  6. app-name: mqtt-broker
  7. spec:
  8. replicas: 1
  9. selector:
  10. matchLabels:
  11. app-name: mqtt-broker
  12. template:
  13. metadata:
  14. labels:
  15. app-name: mqtt-broker
  16. spec:
  17. containers:
  18. - name: mqtt
  19. image: eclipse-mosquitto:1.6.9
  20. imagePullPolicy: IfNotPresent
  21. ports:
  22. - name: default
  23. containerPort: 1883
  24. protocol: TCP
  25. - name: websocket
  26. containerPort: 9001
  27. protocol: TCP
  28. ---
  29. apiVersion: v1
  30. kind: Service
  31. metadata:
  32. name: mqtt-broker
  33. labels:
  34. app-name: mqtt-broker
  35. spec:
  36. type: ClusterIP
  37. selector:
  38. app-name: mqtt-broker
  39. ports:
  40. - port: 1883
  41. targetPort: default
  42. name: default
  43. protocol: TCP
  44. - port: 9001
  45. targetPort: websocket
  46. name: websocket
  47. protocol: TCP

然后你可以通过tcp://mqtt-broker.default.svc.cluster.local:1883与服务器交互。

相关链接