总览

简介

Load 节点列表是一组基于 Apache Flink® 的 Sink Connectors 用于将数据加载到不同的存储系统。

支持的 Load 节点列表

Load 节点版本驱动包
KafkaKafka: 0.10+None
HBaseHbase: 2.2.xNone
PostgreSQLPostgreSQL: 9.6, 10, 11, 12JDBC Driver: 42.2.12
OracleOracle: 11, 12, 19Oracle Driver: 19.3.0.0
MySQLMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.21
TDSQL-PostgreSQLTDSQL-PostgreSQL: 10.17JDBC Driver: 42.2.12
GreenplumGreenplum: 4.x, 5.x, 6.xJDBC Driver: 42.2.12
ElasticsearchElasticsearch: 6.x, 7.xNone
ClickHouseClickHouse: 20.7+JDBC Driver: 0.3.1
HiveHive: 1.x, 2.x, 3.xNone
SQLServerSQLServer: 2012, 2014, 2016, 2017, 2019JDBC Driver: 7.2.2.jre8
HDFSHDFS: 2.x, 3.xNone
IcebergIceberg: 0.13.1+None

下表展示了 InLong® Load 节点 和 Flink® 版本之间的对应关系。

InLong® Load 节点版本Flink® 版本
1.2.01.13.5

SQL API 的用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

  • 设置一个安装了 1.13.5 版本和 Java 8+ 的 Flink 集群。
  • 下载 页面下载并解压 Sort Connectors jars (或者参考 如何编译 编译需要的版本)。
  • 将下载并解压后的 Sort Connectors jars 放到 FLINK_HOME/lib/
  • 重启 Flink 集群。

下面例子展示了如何在 Flink SQL Client 创建一个 MySQL Load 节点并加载数据进去:

  1. -- 创建一个 MySQL Extract 节点
  2. CREATE TABLE mysql_extract_node (
  3. id INT NOT NULL,
  4. name STRING,
  5. age INT,
  6. weight DECIMAL(10,3),
  7. PRIMARY KEY(id) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'mysql-cdc-inlong',
  10. 'hostname' = 'YourHostname',
  11. 'port' = '3306',
  12. 'username' = 'YourUsername',
  13. 'password' = 'YourPassword',
  14. 'database-name' = 'YourDatabaseName',
  15. 'table-name' = 'YourTableName'
  16. );
  17. -- 创建一个 MySQL Load 节点
  18. CREATE TABLE mysql_load_node (
  19. id INT NOT NULL,
  20. name STRING,
  21. age INT,
  22. weight DECIMAL(10,3),
  23. PRIMARY KEY(id) NOT ENFORCED
  24. ) WITH (
  25. 'connector' = 'jdbc-inlong',
  26. 'url' = 'jdbc:mysql://YourHostname:3306/YourDatabaseName',
  27. 'username' = 'YourUsername',
  28. 'password' = 'YourPassword',
  29. 'table-name' = 'YourTableName'
  30. );
  31. INSERT INTO mysql_load_node SELECT id, name, age, weight FROM mysql_extract_node;