MySQL 到 StarRocks 示例

在下面的内容中,我们将通过一个完整的示例介绍如何使用 Apache InLong 创建 MySQL -> StarRocks 数据同步。

环境部署

安装 InLong

在开始之前,我们需要安装 InLong 的全部组件,这里提供两种方式:

添加 Connectors

下载 Flink 1.13 对应版本的 connectors,解压后将 sort-connector-starrocks-[version]-SNAPSHOT.jar 放在 /inlong-sort/connectors/ 目录下。

安装 StarRocks

请参考 Apache StarRocks 官网的安装教程

集群初始化

容器启动成功后,访问 InLong Dashboard 地址 http://localhost,并使用以下默认账号登录:

  1. User: admin
  2. Password: inlong

创建集群标签

页面点击 【集群管理】->【标签管理】->【新建】,指定集群标签名称和负责人: Create Cluster Tag

注:default_cluster 是各个组件默认上报集群标签,如果使用其它名称,确认对应标签配置已修改。

注册 Pulsar 集群

页面点击 【集群管理】 -> 【集群管理】 -> 【新建集群】,注册 Pulsar 集群: Create Pulsar Cluster

MySQL 到 StarRocks 示例 - 图3备注

集群标签选择刚创建的 default_cluster,配置 Docker 部署的 Pulsar 集群:

Service URL 为 pulsar://pulsar:6650, Admin URL 为 http://pulsar:8080.

注册 StarRocks 数据节点

页面点击 【数据节点】 -> 【创建】 ,新增 StarRocks 数据节点. Create StarRocks DataNode

MySQL 到 StarRocks 示例 - 图5备注

  • LOAD URL 请勿携带 http://, 填写 IP + 端口即可。

任务创建

新建数据流组

页面点击【数据同步】 → 【创建】,输入 Group ID、Steam ID 和 是否整库迁移: Create Group Stream

创建数据源

数据源中点击 【新建】 → 【MySQL】 配置数据源名称、地址、库表信息等。 Create Stream_Source

MySQL 到 StarRocks 示例 - 图8备注

  • 读取模式选择 全量+增量 时,表中的存量数据也会被采集,仅增量 模式则不会。
  • 表名白名单格式为 <dbName>.<tableName>,支持正则表达。

创建数据目标

数据目标中点击 【新建】 → 【StarRocks】,设置数据目标名称并选择创建好的 StarRocks 数据节点, 并填写库表名称。 Create data object

审批数据流

点击 【审批管理】 -> 【我的审批】 -> 【审批】 -> 【通过】. Approve

返回【数据集成】,等待任务配置成功: Success

测试数据

发送数据

  1. #!/bin/bash
  2. # MySQL info
  3. DB_HOST="mysql"
  4. DB_USER="root"
  5. DB_PASS="inlong"
  6. DB_NAME="test"
  7. DB_TABLE="source_table"
  8. # Insert data in a loop
  9. for ((i=1; i<=1000; i++))
  10. do
  11. # Generate data
  12. id=$i
  13. name="name_$i"
  14. # Build an insert SQL
  15. query="INSERT INTO $DB_TABLE (id, name) VALUES ($id, '$name');"
  16. # Execute insert SQL
  17. mysql -h $DB_HOST -u $DB_USER -p$DB_PASS $DB_NAME -e "$query"
  18. done

根据实际环境修改脚本中的变量,执行脚本向 source_table 表中累计添加 1000 条数据:

Result Source

验证数据

进入 StarRocks,查看 sink_table 表数据

Result Sink

也可以在页面查看审计数据:

Result Sink