Example

Overview

To make it easier for you to create InLong Sort jobs, here we list some data stream configuration examples. The following will introduce SQL, Dashboard, Manager Client Tools methods to use Inlong Sort.

Environment Requirements

  • JDK 1.8.x
  • Flink 1.13.5
  • MySQL
  • Kafka
  • Hadoop
  • Hive 3.x

Prepare InLong Sort And Connectors

You can prepare InLong Sort and Data Node Connectors by referring to Deployment Guide.

Usage for SQL API

This example defines the data flow for a single table(mysql—>kafka—>hive).

MySQL to Kafka

Single table sync example:

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
  2. --sql.script.file /YOUR_SQL_SCRIPT_DIR/mysql-to-kafka.sql
  • mysql-to-kafka.sql
  1. CREATE TABLE `table_1`(
  2. PRIMARY KEY (`id`) NOT ENFORCED,
  3. `id` BIGINT,
  4. `name` STRING,
  5. `age` INT,
  6. `salary` FLOAT,
  7. `ts` TIMESTAMP(2),
  8. `event_type` STRING)
  9. WITH (
  10. 'append-mode' = 'true',
  11. 'connector' = 'mysql-cdc-inlong',
  12. 'hostname' = 'localhost',
  13. 'username' = 'root',
  14. 'password' = 'password',
  15. 'database-name' = 'dbName',
  16. 'table-name' = 'tableName'
  17. );
  18. CREATE TABLE `table_2`(
  19. `id` BIGINT,
  20. `name` STRING,
  21. `age` INT,
  22. `salary` FLOAT,
  23. `ts` TIMESTAMP(2))
  24. WITH (
  25. 'topic' = 'topicName',-- Your kafka topic
  26. 'properties.bootstrap.servers' = 'localhost:9092',
  27. 'connector' = 'kafka',
  28. 'json.timestamp-format.standard' = 'SQL',
  29. 'json.encode.decimal-as-plain-number' = 'true',
  30. 'json.map-null-key.literal' = 'null',
  31. 'json.ignore-parse-errors' = 'true',
  32. 'json.map-null-key.mode' = 'DROP',
  33. 'format' = 'json',
  34. 'json.fail-on-missing-field' = 'false'
  35. );
  36. INSERT INTO `table_2`
  37. SELECT
  38. `id` AS `id`,
  39. `name` AS `name`,
  40. `age` AS `age`,
  41. CAST(NULL as FLOAT) AS `salary`,
  42. `ts` AS `ts`
  43. FROM `table_1`;

Kafka to Hive

Note: First you need to create user table in Hive.

  1. ./bin/flink run -c org.apache.inlong.sort.Entrance FLINK_HOME/lib/sort-dist-[version].jar \
  2. --sql.script.file /YOUR_SQL_SCRIPT_DIR/kafka-to-hive.sql
  • kafka-to-hive.sql
  1. CREATE TABLE `table_1`(
  2. `id` BIGINT,
  3. `name` STRING,
  4. `age` INT,
  5. `salary` FLOAT,
  6. `ts` TIMESTAMP(2)
  7. WITH (
  8. 'topic' = 'topicName',-- Your kafka topic
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'connector' = 'kafka',
  11. 'scan.startup.mode' = 'earliest-offset',
  12. 'json.timestamp-format.standard' = 'SQL',
  13. 'json.encode.decimal-as-plain-number' = 'true',
  14. 'json.map-null-key.literal' = 'null',
  15. 'json.ignore-parse-errors' = 'true',
  16. 'json.map-null-key.mode' = 'DROP',
  17. 'format' = 'json',
  18. 'json.fail-on-missing-field' = 'false',
  19. 'properties.group.id' = 'groupId'-- Your group id
  20. );
  21. CREATE TABLE `user`(
  22. `id` BIGINT,
  23. `name` STRING,
  24. `age` INT,
  25. `salary` FLOAT,
  26. `ts` TIMESTAMP(9))
  27. WITH (
  28. 'connector' = 'hive',
  29. 'default-database' = 'default',
  30. 'hive-version' = '3.1.2',
  31. 'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
  32. );
  33. INSERT INTO `user`
  34. SELECT
  35. `id` AS `id`,
  36. `name` AS `name`,
  37. `age` AS `age`,
  38. CAST(NULL as FLOAT) AS `salary`,
  39. `ts` AS `ts`
  40. FROM `table_1`;

Note: Of course you can also put all the SQL in one file.

Usage for Dashboard

The underlying capabilities are already available and will complement the Dashboard capabilities in the future.

Usage for Manager Client Tools

TODO: It will be supported in the future.