总览
简介
Extract 节点列表是一组基于 Apache Flink® 的 Source Connectors 用于从不同的源系统抽取数据。
支持的 Extract 节点列表
Extract 节点 | 版本 | 驱动包 |
---|---|---|
Kafka | Kafka: 0.10+ | None |
Pulsar | Pulsar: 2.8.x+ | None |
MongoDB-CDC | MongoDB: 3.6, 4.x, 5.0 | None |
MySQL-CDC | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.21 |
Oracle-CDC | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
PostgreSQL-CDC | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
SQLServer-CDC | SQLServer: 2012, 2014, 2016, 2017, 2019 | None |
支持的 Flink 版本列表
下表展示了 InLong® Extract 节点 和 Flink® 版本之间的对应关系。
InLong® Extract 节点版本 | Flink® 版本 |
---|---|
1.2.0 | 1.13.5 |
SQL API 用法
我们需要几个步骤来使用提供的连接器设置 Flink 集群。
- 设置一个安装了 1.13.5 版本和 Java 8+ 的 Flink 集群。
- 从 下载 页面下载并解压 Sort Connectors jars (或者参考 如何编译 编译需要的版本)。
- 将下载并解压后的 Sort Connectors jars 放到
FLINK_HOME/lib/
。 - 重启 Flink 集群。
下面例子展示了如何在 Flink SQL Client 创建 MySQL Extract 节点,并从中查询数据:
-- 创建一个 MySQL Extract 节点
CREATE TABLE mysql_extract_node (
id INT NOT NULL,
name STRING,
age INT,
weight DECIMAL(10,3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc-inlong',
'hostname' = 'YourHostname',
'port' = '3306',
'username' = 'YourUsername',
'password' = 'YourPassword',
'database-name' = 'YourDatabaseName',
'table-name' = 'YourTableName'
);
SELECT id, name, age, weight FROM mysql_extract_node;