JDBC SQL Connector

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode

The JDBC connector allows for reading data from and writing data into any relational databases with a JDBC driver. This document describes how to setup the JDBC connector to run SQL queries against relational databases.

The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.

Dependencies

In order to use the JDBC connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc</artifactId>
  4. <version>1.16.0</version>
  5. </dependency>
Copied to clipboard!
Download

The JDBC connector is not part of the binary distribution. See how to link with it for cluster execution here.

A driver dependency is also required to connect to a specified database. Here are drivers currently supported:

DriverGroup IdArtifact IdJAR
MySQLmysqlmysql-connector-javaDownload
Oraclecom.oracle.database.jdbcojdbc8Download
PostgreSQLorg.postgresqlpostgresqlDownload
Derbyorg.apache.derbyderbyDownload

JDBC connector and drivers are not part of Flink’s binary distribution. See how to link with them for cluster execution here.

How to create a JDBC table

The JDBC table can be defined as following:

  1. -- register a MySQL table 'users' in Flink SQL
  2. CREATE TABLE MyUserTable (
  3. id BIGINT,
  4. name STRING,
  5. age INT,
  6. status BOOLEAN,
  7. PRIMARY KEY (id) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'jdbc',
  10. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  11. 'table-name' = 'users'
  12. );
  13. -- write data into the JDBC table from the other table "T"
  14. INSERT INTO MyUserTable
  15. SELECT id, name, age, status FROM T;
  16. -- scan data from the JDBC table
  17. SELECT id, name, age, status FROM MyUserTable;
  18. -- temporal join the JDBC table as a dimension table
  19. SELECT * FROM myTopic
  20. LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
  21. ON myTopic.key = MyUserTable.id;

Connector Options

OptionRequiredForwardedDefaultTypeDescription
connector
requiredno(none)StringSpecify what connector to use, here should be ‘jdbc’.
url
requiredyes(none)StringThe JDBC database url.
table-name
requiredyes(none)StringThe name of JDBC table to connect.
driver
optionalyes(none)StringThe class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
username
optionalyes(none)StringThe JDBC user name. ‘username’ and ‘password’ must both be specified if any of them is specified.
password
optionalyes(none)StringThe JDBC password.
connection.max-retry-timeout
optionalyes60sDurationMaximum timeout between retries. The timeout should be in second granularity and shouldn’t be smaller than 1 second.
scan.partition.column
optionalno(none)StringThe column name used for partitioning the input. See the following Partitioned Scan section for more details.
scan.partition.num
optionalno(none)IntegerThe number of partitions.
scan.partition.lower-bound
optionalno(none)IntegerThe smallest value of the first partition.
scan.partition.upper-bound
optionalno(none)IntegerThe largest value of the last partition.
scan.fetch-size
optionalyes0IntegerThe number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored.
scan.auto-commit
optionalyestrueBooleanSets the auto-commit flag on the JDBC driver, which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically Postgres, may require this to be set to false in order to stream results.
lookup.cache
optionalyesNONE

Enum

Possible values: NONE, PARTIAL
The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database).
lookup.partial-cache.max-rows
optionalyes(none)LongThe max number of rows of lookup cache, over this value, the oldest rows will be expired. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-write
optionalyes(none)DurationThe max time to live for each rows in lookup cache after writing into the cache. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.expire-after-access
optionalyes(none)DurationThe max time to live for each rows in lookup cache after accessing the entry in the cache. “lookup.cache” must be set to “PARTIAL” to use this option. See the following Lookup Cache section for more details.
lookup.partial-cache.caching-missing-key
optionalyestrueBooleanWhether to store an empty value into the cache if the lookup key doesn’t match any rows in the table. “lookup.cache” must be set to “PARTIAL” to use this option.
lookup.max-retries
optionalyes3IntegerThe max retry times if lookup database failed.
sink.buffer-flush.max-rows
optionalyes100IntegerThe max size of buffered records before flush. Can be set to zero to disable it.
sink.buffer-flush.interval
optionalyes1sDurationThe flush interval mills, over this time, asynchronous threads will flush data. Can be set to ‘0’ to disable it. Note, ‘sink.buffer-flush.max-rows’ can be set to ‘0’ with the flush interval set allowing for complete async processing of buffered actions.
sink.max-retries
optionalyes3IntegerThe max retry times if writing records to database failed.
sink.parallelism
optionalno(none)IntegerDefines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.

Deprecated Options

These deprecated options has been replaced by new options listed above and will be removed eventually. Please consider using new options first.

OptionRequiredForwardedDefaultTypeDescription
lookup.cache.max-rows
optionalyes(none)IntegerPlease set “lookup.cache” = “PARTIAL” and use “lookup.partial-cache.max-rows” instead.
lookup.cache.ttl
optionalyes(none)DurationPlease set “lookup.cache” = “PARTIAL” and use “lookup.partial-cache.expire-after-write” instead.
lookup.cache.caching-missing-key
optionalyestrueBooleanPlease set “lookup.cache” = “PARTIAL” and use “lookup.partial-cache.caching-missing-key” instead.

Features

Key handling

Flink uses the primary key that was defined in DDL when writing data to external databases. The connector operates in upsert mode if the primary key was defined, otherwise, the connector operates in append mode.

In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database.

See CREATE TABLE DDL for more details about PRIMARY KEY syntax.

Partitioned Scan

To accelerate reading data in parallel Source task instances, Flink provides partitioned scan feature for JDBC table.

All the following scan partition options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple tasks. The scan.partition.column must be a numeric, date, or timestamp column from the table in question. Notice that scan.partition.lower-bound and scan.partition.upper-bound are used to decide the partition stride and filter the rows in table. If it is a batch job, it also doable to get the max and min value first before submitting the flink job.

  • scan.partition.column: The column name used for partitioning the input.
  • scan.partition.num: The number of partitions.
  • scan.partition.lower-bound: The smallest value of the first partition.
  • scan.partition.upper-bound: The largest value of the last partition.

Lookup Cache

JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

By default, lookup cache is not enabled. You can enable it by setting lookup.cache to PARTIAL.

The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.partial-cache.max-rows or when the row exceeds the max time to live specified by lookup.partial-cache.expire-after-write or lookup.partial-cache.expire-after-access. The cached rows might not be the latest, users can tune expiration options to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

By default, flink will cache the empty query result for a Primary key, you can toggle the behaviour by setting lookup.partial-cache.caching-missing-key to false.

Idempotent Writes

JDBC sink will use upsert semantics rather than plain INSERT statements if primary key is defined in DDL. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a unique constraint violation in the underlying database, which provides idempotence.

If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.

As there is no standard syntax for upsert, the following table describes the database-specific DML that is used.

DatabaseUpsert Grammar
MySQLINSERT .. ON DUPLICATE KEY UPDATE ..
OracleMERGE INTO .. USING (..) ON (..)
WHEN MATCHED THEN UPDATE SET (..)
WHEN NOT MATCHED THEN INSERT (..)
VALUES (..)
PostgreSQLINSERT .. ON CONFLICT .. DO UPDATE SET ..

JDBC Catalog

The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol.

Currently, there are two JDBC catalog implementations, Postgres Catalog and MySQL Catalog. They support the following catalog methods. Other methods are currently not supported.

  1. // The supported methods by Postgres & MySQL Catalog.
  2. databaseExists(String databaseName);
  3. listDatabases();
  4. getDatabase(String databaseName);
  5. listTables(String databaseName);
  6. getTable(ObjectPath tablePath);
  7. tableExists(ObjectPath tablePath);

Other Catalog methods are currently not supported.

Usage of JDBC Catalog

The section mainly describes how to create and use a Postgres Catalog or MySQL Catalog. Please refer to Dependencies section for how to setup a JDBC connector and the corresponding driver.

The JDBC catalog supports the following options:

  • name: required, name of the catalog.
  • default-database: required, default database to connect to.
  • username: required, username of Postgres/MySQL account.
  • password: required, password of the account.
  • base-url: required (should not contain the database name)

    • for Postgres Catalog this should be "jdbc:postgresql://<ip>:<port>"
    • for MySQL Catalog this should be "jdbc:mysql://<ip>:<port>"

    SQL

  1. CREATE CATALOG my_catalog WITH(
  2. 'type' = 'jdbc',
  3. 'default-database' = '...',
  4. 'username' = '...',
  5. 'password' = '...',
  6. 'base-url' = '...'
  7. );
  8. USE CATALOG my_catalog;

Java

  1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. String name = "my_catalog";
  4. String defaultDatabase = "mydb";
  5. String username = "...";
  6. String password = "...";
  7. String baseUrl = "..."
  8. JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
  9. tableEnv.registerCatalog("my_catalog", catalog);
  10. // set the JdbcCatalog as the current catalog of the session
  11. tableEnv.useCatalog("my_catalog");

Scala

  1. val settings = EnvironmentSettings.inStreamingMode()
  2. val tableEnv = TableEnvironment.create(settings)
  3. val name = "my_catalog"
  4. val defaultDatabase = "mydb"
  5. val username = "..."
  6. val password = "..."
  7. val baseUrl = "..."
  8. val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
  9. tableEnv.registerCatalog("my_catalog", catalog)
  10. // set the JdbcCatalog as the current catalog of the session
  11. tableEnv.useCatalog("my_catalog")

Python

  1. from pyflink.table.catalog import JdbcCatalog
  2. environment_settings = EnvironmentSettings.in_streaming_mode()
  3. t_env = TableEnvironment.create(environment_settings)
  4. name = "my_catalog"
  5. default_database = "mydb"
  6. username = "..."
  7. password = "..."
  8. base_url = "..."
  9. catalog = JdbcCatalog(name, default_database, username, password, base_url)
  10. t_env.register_catalog("my_catalog", catalog)
  11. # set the JdbcCatalog as the current catalog of the session
  12. t_env.use_catalog("my_catalog")

YAML

  1. execution:
  2. ...
  3. current-catalog: my_catalog # set the target JdbcCatalog as the current catalog of the session
  4. current-database: mydb
  5. catalogs:
  6. - name: my_catalog
  7. type: jdbc
  8. default-database: mydb
  9. username: ...
  10. password: ...
  11. base-url: ...

JDBC Catalog for PostgreSQL

PostgreSQL Metaspace Mapping

PostgreSQL has an additional namespace as schema besides database. A Postgres instance can have multiple databases, each database can have multiple schemas with a default one named “public”, each schema can have multiple tables. In Flink, when querying tables registered by Postgres catalog, users can use either schema_name.table_name or just table_name. The schema_name is optional and defaults to “public”.

Therefore, the metaspace mapping between Flink Catalog and Postgres is as following:

Flink Catalog Metaspace StructurePostgres Metaspace Structure
catalog name (defined in Flink only)N/A
database namedatabase name
table name[schema_name.]table_name

The full path of Postgres table in Flink should be "<catalog>.<db>.`<schema.table>`" if schema is specified, note the <schema.table> should be escaped.

Here are some examples to access Postgres tables:

  1. -- scan table 'test_table' of 'public' schema (i.e. the default schema), the schema name can be omitted
  2. SELECT * FROM mypg.mydb.test_table;
  3. SELECT * FROM mydb.test_table;
  4. SELECT * FROM test_table;
  5. -- scan table 'test_table2' of 'custom_schema' schema,
  6. -- the custom schema can not be omitted and must be escaped with table.
  7. SELECT * FROM mypg.mydb.`custom_schema.test_table2`
  8. SELECT * FROM mydb.`custom_schema.test_table2`;
  9. SELECT * FROM `custom_schema.test_table2`;

JDBC Catalog for MySQL

MySQL Metaspace Mapping

The databases in a MySQL instance are at the same mapping level as the databases under the catalog registered with MySQL Catalog. A MySQL instance can have multiple databases, each database can have multiple tables. In Flink, when querying tables registered by MySQL catalog, users can use either database.table_name or just table_name. The default value is the default database specified when MySQL Catalog was created.

Therefore, the metaspace mapping between Flink Catalog and MySQL Catalog is as following:

Flink Catalog Metaspace StructureMySQL Metaspace Structure
catalog name (defined in Flink only)N/A
database namedatabase name
table nametable_name

The full path of MySQL table in Flink should be "`<catalog>`.`<db>`.`<table>`".

Here are some examples to access MySQL tables:

  1. -- scan table 'test_table', the default database is 'mydb'.
  2. SELECT * FROM mysql_catalog.mydb.test_table;
  3. SELECT * FROM mydb.test_table;
  4. SELECT * FROM test_table;
  5. -- scan table 'test_table' with the given database.
  6. SELECT * FROM mysql_catalog.given_database.test_table2;
  7. SELECT * FROM given_database.test_table2;

Data Type Mapping

Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, Derby. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.

MySQL typeOracle typePostgreSQL typeFlink SQL type
TINYINTTINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
BIGINTBIGINTBIGINT
FLOATBINARY_FLOATREAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
BINARY_DOUBLEFLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
SMALLINT
FLOAT(s)
DOUBLE PRECISION
REAL
NUMBER(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEANBOOLEAN
DATEDATEDATEDATE
TIME [(p)]DATETIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
TEXT
CHAR(n)
VARCHAR(n)
CLOB
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
RAW(s)
BLOB
BYTEABYTES
ARRAYARRAY