MySQL

概览

MySQL Load 节点支持将数据写入 MySQL 数据库。 本文档介绍如何设置 MySQL Load 节点以对 MySQL 数据库运行 SQL 查询。

支持的版本

Load 节点DriverGroup IdArtifact IdJAR
MySQLMySQLmysqlmysql-connector-java下载

依赖

为了设置 MySQL Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-jdbc</artifactId>
  4. <version>1.3.0-SNAPSHOT</version>
  5. </dependency>

MySQL license 和 Inlong license 是冲突的。 所以我们移除了pom中的 MySQL 驱动依赖。 如果我们想使用这个连接器,我们可以修改pom文件。

如何创建 MySQL Load 节点

SQL API 用法

  1. -- MySQL Extract 节点
  2. CREATE TABLE `mysql_extract_table`(
  3. PRIMARY KEY (`id`) NOT ENFORCED,
  4. `id` BIGINT,
  5. `name` STRING,
  6. `age` INT
  7. ) WITH (
  8. 'connector' = 'mysql-cdc-inlong',
  9. 'url' = 'jdbc:mysql://localhost:3306/read',
  10. 'username' = 'inlong',
  11. 'password' = 'inlong',
  12. 'table-name' = 'user'
  13. )
  14. -- MySQL Load 节点
  15. CREATE TABLE `mysql_load_table`(
  16. PRIMARY KEY (`id`) NOT ENFORCED,
  17. `id` BIGINT,
  18. `name` STRING,
  19. `age` INT
  20. ) WITH (
  21. 'connector' = 'jdbc-inlong',
  22. 'url' = 'jdbc:mysql://localhost:3306/write',
  23. 'username' = 'inlong',
  24. 'password' = 'inlong',
  25. 'table-name' = 'user'
  26. )
  27. -- 写数据到 MySQL
  28. INSERT INTO mysql_load_table
  29. SELECT id, name , age FROM mysql_extract_table;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

MySQL Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用什么类型的连接器,这里应该是 ‘jdbc-inlong’。
url必选(none)StringJDBC 数据库 url。
table-name必选(none)String连接到 JDBC 表的名称。
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 ‘0’ 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 ‘0’ 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 ‘sink.buffer-flush.max-rows’ 设置为 ‘0’ 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId=xxgroup&streamId=xxstream&nodeId=xxnode。

数据类型映射

MySQL 类型Flink SQL 类型
TINYINTTINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
FLOATFLOAT
DOUBLE
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN
DATEDATE
TIME [(p)]TIME [(p)][WITHOUT TIMEZONE]
DATETIME [(p)]TIMESTAMP [(p)][WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTES
ARRAY