PostgreSQL

概览

PostgreSQL Load 节点支持将数据写入 PostgreSQL 数据库。 本文档介绍如何设置 PostgreSQL Load 节点以对 PostgreSQL 数据库运行 SQL 查询。

支持的版本

Load 节点DriverGroup IdArtifact IdJAR
PostgreSQLPostgreSQLorg.postgresqlpostgresql下载

依赖

为了设置 PostgreSQL Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-jdbc</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

如何创建 PostgreSQL Load 节点

SQL API 用法

  1. -- MySQL Extract 节点
  2. CREATE TABLE `mysql_extract_table`(
  3. PRIMARY KEY (`id`) NOT ENFORCED,
  4. `id` BIGINT,
  5. `name` STRING,
  6. `age` INT
  7. ) WITH (
  8. 'connector' = 'mysql-cdc-inlong',
  9. 'url' = 'jdbc:mysql://localhost:3306/read',
  10. 'username' = 'inlong',
  11. 'password' = 'inlong',
  12. 'table-name' = 'user'
  13. )
  14. -- PostgreSQL Load 节点
  15. CREATE TABLE `postgresql_load_table`(
  16. PRIMARY KEY (`id`) NOT ENFORCED,
  17. `id` BIGINT,
  18. `name` STRING,
  19. `age` INT
  20. ) WITH (
  21. 'connector' = 'jdbc-inlong',
  22. 'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.PostgresDialect',
  23. 'url' = 'jdbc:postgresql://localhost:5432/write',
  24. 'username' = 'inlong',
  25. 'password' = 'inlong',
  26. 'table-name' = 'public.user'
  27. )
  28. -- 写数据到 PostgreSQL
  29. INSERT INTO postgresql_load_table
  30. SELECT id, name , age FROM mysql_extract_table;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

PostgreSQL Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用什么类型的连接器,这里应该是 ‘jdbc’。
url必选(none)StringJDBC 数据库 url。
table-name必选(none)String连接到 JDBC 表的名称。
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 ‘0’ 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 ‘0’ 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 ‘sink.buffer-flush.max-rows’ 设置为 ‘0’ 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
sink.ignore.changelog可选falseBoolean忽略所有 RowKind 类型的变更日志,将它们当作 INSERT 的数据来采集。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

数据类型映射

PostgreSQL 类型Flink SQL 类型
TINYINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
DECIMAL(20, 0)
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIME [(p)][WITHOUT TIMEZONE]TIME [(p)][WITHOUT TIMEZONE]
TIMESTAMP [(p)]WITHOUT TIMEZONETIMESTAMP [(p)][WITHOUT TIMEZONE]
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEABYTES