ClickHouse

ClickHouse 是一个高性能、列式存储的分布式数据库管理系统,专门用于处理大规模数据。它具有出色的查询性能、灵活的数据模型和可扩展的分布式架构,适用于多种数据分析场景。 EMQX 将 MQTT 消息和客户端事件存储到 ClickHouseClickHouse - 图1 (opens new window)

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用 (opens new window)ClickHouse - 图2 (opens new window)

前置准备

功能清单

快速开始

本节将带您创建一个 ClickHouse 服务器,然后在 EMQX 创建 ClickHouse 的数据桥接,之后再通过创建一条规则来将数据转发至 ClickHouse,以验证该数据桥接是否正常工作。

提示

本教程假定 EMQX 与 ClickHouse 均在本地运行,如您在远程运行 EMQX 及 ClickHouse,请根据实际情况调整相应配置。

启动 ClickHouse 服务器

本节将介绍如何通过 DockerClickHouse - 图3 (opens new window) 启动 ClickHouse 服务器。

  1. 创建一个 init.sql 文件并包含以下初始化 SQL 命令:

    1. cat >init.sql <<SQL_INIT
    2. CREATE DATABASE IF NOT EXISTS mqtt_data;
    3. CREATE TABLE IF NOT EXISTS mqtt_data.messages (
    4. data String,
    5. arrived UnixTimestamp
    6. ) ENGINE = MergeTree();
    7. SQL_INIT
  2. 运行以下命令通过 Docker 启动 ClickHouse 服务器,其中定义了数据库的名称、端口号、用户名和密码,方便后续通过 EMQX 创建数据桥接时配置连接信息。同时我们还将 init.sql 文件映射为容器内的一个文件。

    1. docker run \
    2. --rm \
    3. -e CLICKHOUSE_DB=mqtt_data \
    4. -e CLICKHOUSE_USER=emqx \
    5. -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \
    6. -e CLICKHOUSE_PASSWORD=public \
    7. -p 18123:8123 \
    8. -p 19000:9000 \
    9. --ulimit nofile=262144:262144 \
    10. -v ./init.sql:/docker-entrypoint-initdb.d/init.sql \
    11. clickhouse/clickhouse-server

有关如何通过 Docker 运行 ClickHouse 服务器的更多信息,可阅读 Docker - ClickHouse ServerClickHouse - 图4 (opens new window)

创建 ClickHouse 数据桥接

本节将通过 Dashboard 演示如何创建到 ClickHouse 的数据桥接。

  1. 登陆 EMQX Dashboard,点击左侧目录菜单中的数据集成 -> 数据桥接

  2. 点击页面右上角的创建

  3. 数据桥接类型中选择 ClickHouse,点击下一步

  4. 输入数据桥接名称,名称应为大/小写字母和数字的组合。

  5. 输入连接信息:

  6. 分隔符(可选):用于区分多个输入项,本教程中可保留默认的 , 。注意:您只需在启用批量模式、且使用其他 ClickHouse 数据格式ClickHouse - 图5 (opens new window)时才需更改设置。

  7. 在 SQL 模版中输入以下命令(您可通过规则引擎确保输入 SQL 语句中的字符串能被正确转义,以防 SQL 注入攻击):

    1. INSERT INTO messages(data, arrived) VALUES ('${data}', ${timestamp})

    其中,${data}${timestamp} 分别代表消息内容和时间戳,即稍后将在规则时进行配置的消息转发内容。EMQX 在进行消息转发前会按照设定将其替换为相应内容。

  8. 高级功能(可选):根据情况配置同步/异步模式、批量模式,具体可阅读数据桥接的简介部分

  9. 点击创建前,您可点击测试连接按钮确保能连接到 ClickHouse 服务器。

  10. 最后点击创建按钮完成数据桥接创建。

至此,您已经完成数据桥接的创建,在 Dashboard 的数据桥接页面,可以看到 ClickHouse 数据桥接的状态为已连接。接下来,我们将继续创建一条规则来指定需要转发至 ClickHouse 的数据。

创建数据转发规则

  1. 转到 Dashboard 数据集成 -> 规则页面。

  2. 点击页面右上角的创建

  3. 输入规则 ID,例如 my_rule

  4. 在 SQL 编辑器中输入规则,例如我们希望将 t/# 主题的 MQTT 消息转发至 ClickHouse,可通过如下规则 SQL 实现:

    1. SELECT
    2. payload as data,
    3. now_timestamp() as timestamp
    4. FROM
    5. "t/#"
  5. 点击添加动作按钮,在下拉框中选择使用数据桥接转发,选择之前创建好的 ClickHouse 数据桥接。

  6. 点击添加按钮确认添加动作。

  7. 点击最下方创建按钮完成规则创建。

至此我们已经完成数据桥接和转发规则的创建,您可前往 数据集成 -> Flows 页面查看拓扑图,可看到 t/# 主题的消息已被转发至 ClickHouse。

测试数据桥接与规则

您可通过 EMQX Dashboard 内置的 WebSocket 客户端进行规则和数据桥接的验证。在 Dashboard 页面,点击左侧导航目录中的 问题分析 -> WebSocket 客户端

按照以下步骤建立 WebSocket 客户端并通过该客户端向主题 t/test 发送一条消息:

  1. 填写当前 EMQX 的连接信息。由于本教程中的 EMQX 在本地运行,因此除您修改过配置外(如修改过访问规则的配置,需要输入用户名和密码),可直接使用默认配置。
  2. 点击连接,建立该 MQTT 客户端与 EMQX 的连接。
  3. 前往发布区域,并进行如下配置:
    • 主题:t/test
    • Payload:Hello World ClickHouse from EMQX
    • QoS:2
  4. 点击发布完成消息的发送。

此时,该消息应该已被转发至 ClickHouse 服务器的 mqtt_data 数据库中,并插入 messages 表中。你可在命令行中输入如下命令进行确认:

  1. curl -u emqx:public -X POST -d "SELECT * FROM mqtt_data.messages" http://localhost:18123

如消息被正确转发,将能看到类似的返回结果:

  1. Hello World Clickhouse from EMQX 1679932005