数据桥接简介

数据桥接是用来对接 EMQX 和外部数据系统的通道,比如 MySQL、MongoDB 等数据库, 或 Kafka,RabbitMQ 等消息中间件,或 HTTP 服务器等。

通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。

提示

EMQX 开源版中仅支持 MQTT 桥接 和 HTTP Server,企业版支持的数据桥接请详见:企业数据集成数据集成 - 图1 (opens new window)

数据桥接指标

EMQX 提供以下数据集成的运行统计指标:

  • 命中(counter)
  • 发送成功(counter)
  • 发送失败(counter)
  • 已丢弃(counter)
  • 延迟回复(counter)
  • 进行中(gauge)
  • 排队中(gauge)

metrics

命中

命中 统计了无论桥接的状态如何,都被路由到桥接的请求/消息的数量。每条消息最终由其他指标计算,因此 命中 的计算公式为:命中 = 成功发送 + 发送失败 + 进行中 + 排队中 + 延迟回复 + 已丢弃

发送成功

发送成功 统计了成功被外部数据系统接收的消息数量。重新尝试发送成功发送成功 的子计数,用于跟踪至少重试一次的消息数量。因此,重新尝试发送成功 ≤ 发送成功

发送失败

发送失败 统计了未能被外部数据系统接收的消息数量。重新尝试发送失败发送失败 的子计数,用于跟踪至少重试一次的消息数量。因此,重新尝试发送失败 ≤ 发送失败

已丢弃

已丢弃 统计了未经任何发送尝试而被丢弃的消息数量。它包含了几个更具体的子类别,每个子类别都表示丢弃的不同原因。已丢弃 的计算公式为:已丢弃 = 过期 + 队列已满 + 资源已停止 + 未找到资源

  • 过期:在排队等待发送之前,消息的生存时间(TTL)已经到期。
  • 队列已满:达到了最大队列大小,为防止内存溢出而丢弃消息。
  • 资源已停止:在桥接已停止的情况下,仍然尝试发送消息。
  • 未找到资源:在桥接不再存在时尝试发送消息。这种情况非常罕见,通常是由于在移除桥接时出现竞争条件。

延迟回复

当尝试发送消息时,在消息的生存时间(TTL)过期后仍然收到底层驱动程序的响应时,延迟回复 会递增。

提示

请注意,延迟回复 不表示消息是否成功发送或发送失败,它是一种未知状态。它既可能成功插入外部数据系统,也可能插入失败,甚至在尝试建立与数据系统的连接时连接超时。

进行中

进行中 是度量当前在缓冲层中正在等待来自外部数据系统的响应的消息数。

排队中

排队中 是度量已经被缓冲层接收但尚未发送到外部数据系统的消息数。

数据桥接状态

数据桥接可以具有以下状态:

  • connecting:在进行任何健康检查之前的初始状态,桥接仍在尝试连接到外部数据系统。
  • connected:桥接成功连接并正常运行。如果健康检查失败,桥接可能会转换为 connectingdisconnected 状态,具体取决于故障的严重程度。
  • disconnected:桥接未通过健康检查,处于不健康状态。根据其配置,它可能会定期尝试自动重新连接。
  • stopped:桥接已被手动禁用。
  • inconsistent:集群节点之间对桥接状态存在分歧。

数据桥接特性

数据桥接借助以下特性以增强易用性、进一步提高数据集成的性能和可靠性,并非所有数据桥接都完全实现了这些特性,具体支持情况请参照各自的说明文档。

连接池

连接池是一组可重用的连接对象。通过连接池,用户无需在为每个请求重新创建连接,有助于降低资源消耗,提高连接效率和并发能力。

EMQX 会为每个需要创建数据桥的节点创建一个单独的连接池。例如,对一个包含 3 个 EMQX 节点的集群,如果将每个数据桥的连接池大小设置为 8,那么 EMQX 将创建 3 x 8 = 24 个连接。注意:请确保构建的连接池数量不要超过资源的连接限制。

异步请求模式

异步请求模式可以防止消息的发布服务由于 I/O 压力而受影响。但在开启异步请求模式后,由于历史消息会在数据桥接处排队 ,客户端新发送消息的时间序列将受到影响。

为了提高数据处理效率,EMQX 默认开启异步请求模式。如果您对消息的时间序列有严格要求,请禁用异步请求模式。

示例代码

  1. bridges.mysql.foo {
  2. server = "localhost"
  3. database = "emqx"
  4. enable = true
  5. ...
  6. resource_opts {
  7. query_mode = "sync"
  8. ...
  9. }
  10. }

提示

为保证消息的时序性,请同时将 max_inflight 设置为 1。

批量模式

批量模式可以将多条数据同时写入外部数据集成中,启用批量后 EMQX 将暂存每次请求的数据(单条),达到一定时间或累积一定数据条数(两者均可自行配置)后将暂存的整批数据写入到目标数据系统。

优点:

  • 提高写入效率:相对于单条消息的写入方式,批量模式下,数据库系统在正式处理消息前,一般会先对其进行缓存或预处理等优化操作,提高写入效率。

  • 减少网络延迟:批量写入可以减少网络传输次数,进而减少网络延迟。

问题:

  • 数据写入时延较长:数据在达到设置的时间或条数之后才会被写入,时延较长。注意:您可以通过下方参数对设置时间或条数进行调整。
  • 有一定延迟:在达到设置的时间或累积数据条数之前数据不会立即写入,可通过参数进行调整。

批量模式配置

  1. bridges.mysql.foo {
  2. server = "localhost"
  3. database = "emqx"
  4. enable = true
  5. ...
  6. resource_opts {
  7. batch_size = 100
  8. batch_time = "20ms"
  9. ...
  10. }
  11. }

缓存队列

缓存队列为数据桥接提供了一定的容错性,建议启用该选项以提高数据安全性。

每个资源连接(此处并非 MQTT 连接)缓存队列长度(按容量大小),超出长度按照 FIFO 的原则丢弃数据。

缓存文件位置

对于 Kafka Bridge,磁盘缓存文件位于 data/kafka 下,其他数据桥接磁盘缓存文件位于 data/resource_worker 下。

实际使用中可以根据情况将 data 目录挂载至高性能磁盘以提高吞吐能力。

缓存队列配置

  1. bridges.mysql.foo {
  2. server = "localhost"
  3. database = "emqx"
  4. enable = true
  5. ...
  6. resource_opts {
  7. max_queue_bytes = "100MB"
  8. query_mode = "async"
  9. ...
  10. }
  11. }

SQL 预处理

在诸如 MySQL、PostgreSQL 等 SQL 数据库中,SQL 模板会进行预处理执行,无需显式的指定字段变量。

直接执行 SQL 时,必须通过单引号显式设置 topic 与 payload 为字符类型,qos 为 int 类型:

  1. INSERT INTO msg(topic, qos, payload) VALUES('${topic}', ${qos}, '${payload}');

但在支持 SQL 预处理的数据桥接中,SQL 模板必须使用不带引号的预处理语句:

  1. INSERT INTO msg(topic, qos, payload) VALUES(${topic}, ${qos}, ${payload});

除了自动推导字段类型外,SQL 预处理技术还能避免 SQL 注入以提高安全性。