TDSQL-PostgreSQL

Overview

The TDSQL-PostgreSQL Load Node supports to write data into TDSQL-PostgreSQL database. This document describes how to set up the TDSQL-PostgreSQL Load Node to run SQL queries against TDSQL-PostgreSQL database.

Supported Version

Load NodeDriverGroup IdArtifact IdJAR
TDSQL-PostgreSQLPostgreSQLorg.postgresqlpostgresqlDownload

Dependencies

In order to set up the TDSQL-PostgreSQL Load Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-jdbc</artifactId>
  4. <version>1.4.0-SNAPSHOT</version>
  5. </dependency>

How to create a TDSQL-PostgreSQL Load Node

Usage for SQL API

  1. -- MySQL extract node
  2. CREATE TABLE `mysql_extract_table`(
  3. PRIMARY KEY (`id`) NOT ENFORCED,
  4. `id` BIGINT,
  5. `name` STRING,
  6. `age` INT
  7. ) WITH (
  8. 'connector' = 'mysql-cdc-inlong',
  9. 'url' = 'jdbc:mysql://localhost:3306/read',
  10. 'username' = 'inlong',
  11. 'password' = 'inlong',
  12. 'table-name' = 'user'
  13. )
  14. -- TDSQL-PostgreSQL load node
  15. CREATE TABLE `tdsql_postgresql_load_table`(
  16. PRIMARY KEY (`id`) NOT ENFORCED,
  17. `id` BIGINT,
  18. `name` STRING,
  19. `age` INT
  20. ) WITH (
  21. 'connector' = 'jdbc-inlong',
  22. 'url' = 'jdbc:postgresql://localhost:5432/write',
  23. 'username' = 'inlong',
  24. 'password' = 'inlong',
  25. 'table-name' = 'public.user'
  26. )
  27. -- write data into TDSQL-PostgreSQL
  28. INSERT INTO tdsql_postgresql_load_table
  29. SELECT id, name , age FROM mysql_extract_table;

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

TDSQL-PostgreSQL Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘jdbc-inlong’.
urlrequired(none)StringThe JDBC database url.
table-namerequired(none)StringThe name of JDBC table to connect.
driveroptional(none)StringThe class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
usernameoptional(none)StringThe JDBC user name. ‘username’ and ‘password’ must both be specified if any of them is specified.
passwordoptional(none)StringThe JDBC password.
connection.max-retry-timeoutoptional60sDurationMaximum timeout between retries. The timeout should be in second granularity and shouldn’t be smaller than 1 second.
sink.buffer-flush.max-rowsoptional100IntegerThe max size of buffered records before flush. Can be set to zero to disable it.
sink.buffer-flush.intervaloptional1sDurationThe flush interval mills, over this time, asynchronous threads will flush data. Can be set to ‘0’ to disable it. Note, ‘sink.buffer-flush.max-rows’ can be set to ‘0’ with the flush interval set allowing for complete async processing of buffered actions.
sink.max-retriesoptional3IntegerThe max retry times if writing records to database failed.
sink.parallelismoptional(none)IntegerDefines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
sink.ignore.changelogoptionalfalseBooleanIgnore all RowKind, ingest them as INSERT.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Data Type Mapping

TDSQL-PostgreSQL typeFlink SQL type
TINYINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
DECIMAL(20, 0)
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIME [(p)][WITHOUT TIMEZONE]TIME [(p)][WITHOUT TIMEZONE]
TIMESTAMP [(p)]WITHOUT TIMEZONETIMESTAMP [(p)][WITHOUT TIMEZONE]
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEABYTES