安装部署

InLong Sort 是基于 Flink 的一个应用,需要准备好 Apache Flink 环境

当前 InLong Sort 依赖的是 Apache Flink 1.13.5 版本,因此在下载部署包时,请选择 flink-1.13.5-bin-scala_2.11.tgz

准备安装文件

  • InLong Sort 运行文件,下载 apache-inlong-[version]-bin.tar.gz
  • 数据节点 Connectors,下载 apache-inlong-[version]-sort-connectors.tar.gz

安装部署 - 图1警告

Connectors 下载后可以将需要的 jars 放到FLINK_HOME/lib/下。
如果使用mysql-cdc-inlong 连接器,请将 mysql-connector-java:8.0.21.jar 包放到 FLINK_HOME/lib/下。

启动 InLong Sort 任务

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
  2. --sql.script.file [souce-to-sink].sql

安装部署 - 图2备注

--sql.script.file 需要指定一个 SQL 脚本文件,包含多个 Flink SQL 语句,可以用分号分隔。支持CREATE TABLECRETAE VIEWINSERT INTO 等。

MySQL to PostgreSQL

如果我们想从 MySQL 读取数据并写入 PostgreSQL,我们可以编写以下 SQL 脚本。

  • 准备 mysql-to-postgresql.sql
  1. CREATE TABLE `table_1`(
  2. `age` INT,
  3. `name` STRING)
  4. WITH (
  5. 'connector' = 'mysql-cdc-inlong',
  6. 'hostname' = 'localhost',
  7. 'username' = 'root',
  8. 'password' = 'inlong',
  9. 'database-name' = 'test',
  10. 'scan.incremental.snapshot.enabled' = 'false',
  11. 'server-time-zone' = 'GMT+8',
  12. 'table-name' = 'user'
  13. );
  14. CREATE TABLE `table_2`(
  15. PRIMARY KEY (`name`) NOT ENFORCED,
  16. `name` STRING,
  17. `age` INT)
  18. WITH (
  19. 'connector' = 'jdbc',
  20. 'url' = 'jdbc:postgresql://localhost:5432/postgres',
  21. 'username' = 'postgres',
  22. 'password' = 'inlong',
  23. 'table-name' = 'public.user',
  24. 'port' = '3306'
  25. );
  26. INSERT INTO `table_2`
  27. SELECT
  28. `name` AS `name`,
  29. `age` AS `age`
  30. FROM `table_1`;
  • 提交任务
  1. ./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
  2. --sql.script.file mysql-to-postgresql.sql

Other complete usage example, you can refer to Example