PostgreSQL-CDC

Overview

The PostgreSQL Extract Node allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to set up the PostgreSQL Extract Node to run SQL queries against PostgreSQL databases.

Supported Version

Extract NodeVersionDriver
PostgreSQL-CDCPostgreSQL: 9.6, 10, 11, 12JDBC Driver: 42.2.12

Dependencies

In order to set up the PostgreSQL Extract Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-postgres-cdc</artifactId>
  4. <version>1.3.0-SNAPSHOT</version>
  5. </dependency>

Setup PostgreSQL server

Change Data Capture (CDC) allows you to track and propagate changes in a PostgreSQL database to downstream consumers based on its Write-Ahead Log (WAL). You need to ensure that the upstream database is configured to support logical replication. Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use. If you plan not to use the native pgoutput logical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server.

pgoutput

pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed.

  1. Check the wal_level configuration setting:
  1. SHOW wal_level;

The default value is replica. For CDC, you’ll need to set it to logical in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.

  1. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
  1. wal_level = logical

decoderbufs

decoderbufs is based on Protobuf and maintained by the Debezium community. installing it.

  1. To load the plug-in at startup, add the following to the postgresql.conf file:
  1. shared_preload_libraries = 'decoderbufs'
  1. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
  1. wal_level = logical

How to create a PostgreSQL Extract Node

Usage for SQL API

  1. CREATE TABLE `postgresTable`(
  2. `name` STRING,
  3. `age` INT
  4. ) WITH (
  5. 'connector' = 'postgres-cdc-inlong',
  6. 'hostname' = 'localhost',
  7. 'username' = 'postgres',
  8. 'password' = 'inlong',
  9. 'database-name' = 'postgres',
  10. 'schema-name' = 'public',
  11. 'port' = '5432',
  12. 'table-name' = 'user',
  13. 'decoding.plugin.name' = 'pgoutput',
  14. 'slot.name' = 'feaafacbaddadc'
  15. )

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

PostgreSQL Extract Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be postgres-cdc-inlong.
hostnamerequired(none)StringIP address or hostname of the PostgreSQL database server.
usernamerequired(none)StringName of the PostgreSQL database to use when connecting to the PostgreSQL database server.
passwordrequired(none)StringPassword to use when connecting to the PostgreSQL database server.
database-namerequired(none)StringDatabase name of the PostgreSQL server to monitor.
schema-namerequired(none)StringSchema name of the PostgreSQL database to monitor.
table-namerequired(none)StringTable name of the PostgreSQL database to monitor.
portoptional5432IntegerInteger port number of the PostgreSQL database server.
decoding.plugin.nameoptionaldecoderbufsStringThe name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
slot.nameoptionalflinkStringThe name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
debezium.*optional(none)StringPass-through Debezium’s properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: ‘debezium.snapshot.mode’ = ‘never’. See more about the Debezium’s Postgres Connector properties.
inlong.metricoptional(none)StringInlong metric label, format of value is groupId&streamId&nodeId.

Note: slot.name is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot “flink” is active for PID 974 error.
Note: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. We can delete slot by the following statement.

  1. SELECT*FROM pg_replication_slots;
  2. -- get slot name is flink. delete it
  3. SELECT pg_drop_replication_slot('flink');

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

keyData TypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
schema_nameSTRING NOT NULLName of the schema that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

  1. CREATE TABLE postgresTable (
  2. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  3. table_name STRING METADATA FROM 'table_name' VIRTUAL,
  4. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  5. `name` STRING,
  6. `age` INT
  7. ) WITH (
  8. 'connector' = 'postgres-cdc-inlong',
  9. 'hostname' = 'localhost',
  10. 'username' = 'postgres',
  11. 'password' = 'inlong',
  12. 'database-name' = 'postgres',
  13. 'schema-name' = 'public',
  14. 'port' = '5432',
  15. 'table-name' = 'user',
  16. 'decoding.plugin.name' = 'pgoutput',
  17. 'slot.name' = 'feaafacbaddadc'
  18. );

Data Type Mapping

PostgreSQL typeFlink SQL type
TINYINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
DECIMAL(20, 0)
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIME [(p)][WITHOUT TIMEZONE]TIME [(p)][WITHOUT TIMEZONE]
TIMESTAMP [(p)]WITHOUT TIMEZONETIMESTAMP [(p)][WITHOUT TIMEZONE]
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEABYTES