安装部署

当前 InLong Sort 是基于 Flink 的一个应用,因此运行 InLong Sort 应用前,需要准备好 Flink 环境

由于当前 InLong Sort 依赖的是 Flink1.13.5 版本,因此在下载部署包时,请选择flink-1.13.5-bin-scala_2.11.tgz

准备安装文件

  • InLong Sort 运行文件,下载 apache-inlong-[version]-bin.tar.gz
  • 数据节点 Connectors,下载 apache-inlong-[version]-sort-connectors.tar.gz

注意:Connectors 下载后可以将需要的 jars 放到FLINK_HOME/lib/下。
如果使用mysql-cdc-inlong 连接器,请将 mysql-connector-java:8.0.21.jar 包放到 FLINK_HOME/lib/下。

启动 InLong Sort

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
  2. --sql.script.file mysql-to-postgresql.sql

配置

/YOUR_SQL_SCRIPT_DIR/mysql-to-postgresql.sql 是一个 sql 脚本文件,包含多个 Flink SQL 语句,可以用分号分隔。 语句可以支持CREATE TABLECRETAE VIEWINSERT INTO。 我们可以写sql来做数据集成。

如果我们想从 MySQL 读取数据并写入 PostgreSQL,我们可以编写以下 SQL 脚本。

  1. CREATE TABLE `table_1`(
  2. `age` INT,
  3. `name` STRING)
  4. WITH (
  5. 'connector' = 'mysql-cdc-inlong',
  6. 'hostname' = 'localhost',
  7. 'username' = 'root',
  8. 'password' = 'inlong',
  9. 'database-name' = 'test',
  10. 'scan.incremental.snapshot.enabled' = 'false',
  11. 'server-time-zone' = 'GMT+8',
  12. 'table-name' = 'user'
  13. );
  14. CREATE TABLE `table_2`(
  15. PRIMARY KEY (`name`) NOT ENFORCED,
  16. `name` STRING,
  17. `age` INT)
  18. WITH (
  19. 'connector' = 'jdbc',
  20. 'url' = 'jdbc:postgresql://localhost:5432/postgres',
  21. 'username' = 'postgres',
  22. 'password' = 'inlong',
  23. 'table-name' = 'public.user',
  24. 'port' = '3306'
  25. );
  26. INSERT INTO `table_2`
  27. SELECT
  28. `name` AS `name`,
  29. `age` AS `age`
  30. FROM `table_1`;