Example

示例

为了更容易创建 InLong Sort 作业,这里我们列出了一些数据流配置示例。下面将介绍 InLong Sort 的 SQL、Dashboard、Manager 客户端工具的使用。

环境要求

  • JDK 1.8.x
  • Flink 1.13.5
  • MySQL
  • Kafka
  • Hadoop
  • Hive 3.x

准备 InLong Sort 和 Connectors

你可以通过参考部署指引准备 InLong Sort 和数据节点 Connectors。

使用 SQL API 方式

示例构建了 MySQL —> Kafka —> Hive 的数据流,为了便于理解流程执行过程进行了拆解。

读 MySQL 写 Kafka

单表同步配置示例如下:

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
  2. --sql.script.file /YOUR_SQL_SCRIPT_DIR/mysql-to-kafka.sql
  • mysql-to-kafka.sql
  1. CREATE TABLE `table_1`(
  2. PRIMARY KEY (`id`) NOT ENFORCED,
  3. `id` BIGINT,
  4. `name` STRING,
  5. `age` INT,
  6. `salary` FLOAT,
  7. `ts` TIMESTAMP(2),
  8. `event_type` STRING)
  9. WITH (
  10. 'append-mode' = 'true',
  11. 'connector' = 'mysql-cdc-inlong',
  12. 'hostname' = 'localhost',
  13. 'username' = 'root',
  14. 'password' = 'password',
  15. 'database-name' = 'dbName',
  16. 'table-name' = 'tableName'
  17. );
  18. CREATE TABLE `table_2`(
  19. `id` BIGINT,
  20. `name` STRING,
  21. `age` INT,
  22. `salary` FLOAT,
  23. `ts` TIMESTAMP(2))
  24. WITH (
  25. 'topic' = 'topicName',-- Your kafka topic
  26. 'properties.bootstrap.servers' = 'localhost:9092',
  27. 'connector' = 'kafka',
  28. 'json.timestamp-format.standard' = 'SQL',
  29. 'json.encode.decimal-as-plain-number' = 'true',
  30. 'json.map-null-key.literal' = 'null',
  31. 'json.ignore-parse-errors' = 'true',
  32. 'json.map-null-key.mode' = 'DROP',
  33. 'format' = 'json',
  34. 'json.fail-on-missing-field' = 'false'
  35. );
  36. INSERT INTO `table_2`
  37. SELECT
  38. `id` AS `id`,
  39. `name` AS `name`,
  40. `age` AS `age`,
  41. CAST(NULL as FLOAT) AS `salary`,
  42. `ts` AS `ts`
  43. FROM `table_1`;

读 Kafka 写 Hive

注意: 首先需要在 hive 中创建 user 表。

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
  2. --sql.script.file /YOUR_SQL_SCRIPT_DIR/kafka-to-hive.sql
  • kafka-to-hive.sql
  1. CREATE TABLE `table_1`(
  2. `id` BIGINT,
  3. `name` STRING,
  4. `age` INT,
  5. `salary` FLOAT,
  6. `ts` TIMESTAMP(2)
  7. WITH (
  8. 'topic' = 'topicName',-- Your kafka topic
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'connector' = 'kafka',
  11. 'scan.startup.mode' = 'earliest-offset',
  12. 'json.timestamp-format.standard' = 'SQL',
  13. 'json.encode.decimal-as-plain-number' = 'true',
  14. 'json.map-null-key.literal' = 'null',
  15. 'json.ignore-parse-errors' = 'true',
  16. 'json.map-null-key.mode' = 'DROP',
  17. 'format' = 'json',
  18. 'json.fail-on-missing-field' = 'false',
  19. 'properties.group.id' = 'groupId'-- Your group id
  20. );
  21. CREATE TABLE `user`(
  22. `id` BIGINT,
  23. `name` STRING,
  24. `age` INT,
  25. `salary` FLOAT,
  26. `ts` TIMESTAMP(9))
  27. WITH (
  28. 'connector' = 'hive',
  29. 'default-database' = 'default',
  30. 'hive-version' = '3.1.2',
  31. 'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
  32. );
  33. INSERT INTO `user`
  34. SELECT
  35. `id` AS `id`,
  36. `name` AS `name`,
  37. `age` AS `age`,
  38. CAST(NULL as FLOAT) AS `salary`,
  39. `ts` AS `ts`
  40. FROM `table_1`;

备注:以上过程所有的 SQL 可以放在一个文件中提交执行。

使用 Inlong Dashboard 方式

目前 Dashboard 支持文件采集同步的方式,以上数据源可视化配置方式正在开发中。

使用 Manager Client Tools 方式

TODO: 未来发布的版本将会支持。