Example
To make it easier for you to create InLong Sort jobs, here we list some data stream configuration examples. The following will introduce SQL, Dashboard, Manager Client Tools methods to use Inlong Sort.
Environment Requirements
- Apache Flink 1.13.5
- MySQL
- Apache Kafka
- Apache Hadoop
- Apache Hive 3.x
Prepare InLong Sort And Connectors
You can prepare InLong Sort and Data Node Connectors by referring to Deployment Guide.
Usage for SQL API
This example defines the data flow for a single table(mysql—>kafka—>hive).
MySQL to Kafka
Single table sync example:
./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
--sql.script.file mysql-to-kafka.sql
- mysql-to-kafka.sql
CREATE TABLE `table_1`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` FLOAT,
`ts` TIMESTAMP(2),
`event_type` STRING)
WITH (
'append-mode' = 'true',
'connector' = 'mysql-cdc-inlong',
'hostname' = 'localhost',
'username' = 'root',
'password' = 'password',
'database-name' = 'dbName',
'table-name' = 'tableName'
);
CREATE TABLE `table_2`(
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` FLOAT,
`ts` TIMESTAMP(2))
WITH (
'topic' = 'topicName',-- Your kafka topic
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
INSERT INTO `table_2`
SELECT
`id` AS `id`,
`name` AS `name`,
`age` AS `age`,
CAST(NULL as FLOAT) AS `salary`,
`ts` AS `ts`
FROM `table_1`;
Kafka to Hive
caution
First you need to create user
table in Hive.
./bin/flink run -c org.apache.inlong.sort.Entrance apache-inlong-[version]-bin/inlong-sort/sort-dist-[version].jar \
--sql.script.file kafka-to-hive.sql
- kafka-to-hive.sql
CREATE TABLE `table_1`(
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` FLOAT,
`ts` TIMESTAMP(2)
WITH (
'topic' = 'topicName',-- Your kafka topic
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'properties.group.id' = 'groupId'-- Your group id
);
CREATE TABLE `user`(
`id` BIGINT,
`name` STRING,
`age` INT,
`salary` FLOAT,
`ts` TIMESTAMP(9))
WITH (
'connector' = 'hive',
'default-database' = 'default',
'hive-version' = '3.1.2',
'hive-conf-dir' = 'hdfs://ip:9000/.../hive-site.xml' -- Put your hive-site.xml into HDFS
);
INSERT INTO `user`
SELECT
`id` AS `id`,
`name` AS `name`,
`age` AS `age`,
CAST(NULL as FLOAT) AS `salary`,
`ts` AS `ts`
FROM `table_1`;
Other Connectors
there are lots of supported Extract Node and Load Node , you can use them directly.