Table & SQL Connectors

Flink’s Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC.

This page describes how to register table sources and table sinks in Flink using the natively supported connectors. After a source or sink has been registered, it can be accessed by Table API & SQL statements.

If you want to implement your own custom table source or sink, have a look at the user-defined sources & sinks page.

Supported Connectors

Flink natively support various connectors. The following tables list all available connectors.

NameVersionSourceSink
FilesystemBounded and Unbounded ScanStreaming Sink, Batch Sink
Elasticsearch6.x & 7.xNot supportedStreaming Sink, Batch Sink
Opensearch1.x & 2.xNot supportedStreaming Sink, Batch Sink
Apache Kafka0.10+Unbounded ScanStreaming Sink, Batch Sink
Amazon DynamoDBNot supportedStreaming Sink, Batch Sink
Amazon Kinesis Data StreamsUnbounded ScanStreaming Sink
Amazon Kinesis Data FirehoseNot supportedStreaming Sink
JDBCBounded Scan, LookupStreaming Sink, Batch Sink
Apache HBase1.4.x & 2.2.xBounded Scan, LookupStreaming Sink, Batch Sink
Apache HiveSupported VersionsUnbounded Scan, Bounded Scan, LookupStreaming Sink, Batch Sink
MongoDB3.6.x & 4.x & 5.x & 6.0.xBounded Scan, LookupStreaming Sink, Batch Sink

Please refer to the configuration section on how to add connectors as a dependency.

How to use connectors

Flink supports using SQL CREATE TABLE statements to register tables. One can define the table name, the table schema, and the table options for connecting to an external system.

See the SQL section for more information about creating a table.

The following code shows a full example of how to connect to Kafka for reading and writing JSON records.

SQL

  1. CREATE TABLE MyUserTable (
  2. -- declare the schema of the table
  3. `user` BIGINT,
  4. `message` STRING,
  5. `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', -- use a metadata column to access Kafka's record timestamp
  6. `proctime` AS PROCTIME(), -- use a computed column to define a proctime attribute
  7. WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND -- use a WATERMARK statement to define a rowtime attribute
  8. ) WITH (
  9. -- declare the external system to connect to
  10. 'connector' = 'kafka',
  11. 'topic' = 'topic_name',
  12. 'scan.startup.mode' = 'earliest-offset',
  13. 'properties.bootstrap.servers' = 'localhost:9092',
  14. 'format' = 'json' -- declare a format for this system
  15. )

The desired connection properties are converted into string-based key-value pairs. Factories will create configured table sources, table sinks, and corresponding formats from the key-value pairs based on factory identifiers (kafka and json in this example). All factories that can be found via Java’s Service Provider Interfaces (SPI) are taken into account when searching for exactly one matching factory for each component.

If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.

Transform table connector/format resources

Flink uses Java’s Service Provider Interfaces (SPI) to load the table connector/format factories by their identifiers. Since the SPI resource file named org.apache.flink.table.factories.Factory for every table connector/format is under the same directory META-INF/services, these resource files will override each other when build the uber-jar of the project which uses more than one table connector/format, which will cause Flink to fail to load table connector/format factories.

In this situation, the recommended way is transforming these resource files under the directory META-INF/services by ServicesResourceTransformer of maven shade plugin. Given the pom.xml file content of example that contains connector flink-sql-connector-hive-3.1.3 and format flink-parquet in a project.

  1. <modelVersion>4.0.0</modelVersion>
  2. <groupId>org.example</groupId>
  3. <artifactId>myProject</artifactId>
  4. <version>1.0-SNAPSHOT</version>
  5. <dependencies>
  6. <!-- other project dependencies ...-->
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-sql-connector-hive-3.1.3_2.12</artifactId>
  10. <version>1.18.1</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-parquet_2.12</artifactId>
  15. <version>1.18.1</version>
  16. </dependency>
  17. </dependencies>
  18. <build>
  19. <plugins>
  20. <plugin>
  21. <groupId>org.apache.maven.plugins</groupId>
  22. <artifactId>maven-shade-plugin</artifactId>
  23. <executions>
  24. <execution>
  25. <id>shade</id>
  26. <phase>package</phase>
  27. <goals>
  28. <goal>shade</goal>
  29. </goals>
  30. <configuration>
  31. <transformers combine.children="append">
  32. <!-- The service transformer is needed to merge META-INF/services files -->
  33. <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  34. <!-- ... -->
  35. </transformers>
  36. </configuration>
  37. </execution>
  38. </executions>
  39. </plugin>
  40. </plugins>
  41. </build>

After configured the ServicesResourceTransformer, the table connector/format resource files under the directory META-INF/services would be merged rather than overwritten each other when build the uber-jar of above project.

Schema Mapping

The body clause of a SQL CREATE TABLE statement defines the names and types of physical columns, constraints and watermarks. Flink doesn’t hold the data, thus the schema definition only declares how to map physical columns from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field names can be arbitrary). This will be explained in every connector.

The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.

SQL

  1. CREATE TABLE MyTable (
  2. MyField1 INT,
  3. MyField2 STRING,
  4. MyField3 BOOLEAN
  5. ) WITH (
  6. ...
  7. )

Metadata

Some connectors and formats expose additional metadata fields that can be accessed in metadata columns next to the physical payload columns. See the CREATE TABLE section for more information about metadata columns.

Primary Key

Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain nulls. Primary key uniquely identifies a row in a table.

The primary key of a source table is a metadata information for optimization. The primary key of a sink table is usually used by the sink implementation for upserting.

SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity.

SQL

  1. CREATE TABLE MyTable (
  2. MyField1 INT,
  3. MyField2 STRING,
  4. MyField3 BOOLEAN,
  5. PRIMARY KEY (MyField1, MyField2) NOT ENFORCED -- defines a primary key on columns
  6. ) WITH (
  7. ...
  8. )

Time Attributes

Time attributes are essential when working with unbounded streaming tables. Therefore both proctime and rowtime attributes can be defined as part of the schema.

For more information about time handling in Flink and especially event-time, we recommend the general event-time section.

Proctime Attributes

In order to declare a proctime attribute in the schema, you can use Computed Column syntax to declare a computed column which is generated from PROCTIME() builtin function. The computed column is a virtual column which is not stored in the physical data.

SQL

  1. CREATE TABLE MyTable (
  2. MyField1 INT,
  3. MyField2 STRING,
  4. MyField3 BOOLEAN,
  5. MyField4 AS PROCTIME() -- declares a proctime attribute
  6. ) WITH (
  7. ...
  8. )

Rowtime Attributes

In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.

Please refer to CREATE TABLE statements for more information about defining time attributes in DDL.

The following timestamp extractors are supported:

DDL

  1. -- use the existing TIMESTAMP(3) field in schema as the rowtime attribute
  2. CREATE TABLE MyTable (
  3. ts_field TIMESTAMP(3),
  4. WATERMARK FOR ts_field AS ...
  5. ) WITH (
  6. ...
  7. )
  8. -- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field
  9. CREATE TABLE MyTable (
  10. log_ts STRING,
  11. ts_field AS TO_TIMESTAMP(log_ts),
  12. WATERMARK FOR ts_field AS ...
  13. ) WITH (
  14. ...
  15. )

The following watermark strategies are supported:

DDL

  1. -- Sets a watermark strategy for strictly ascending rowtime attributes. Emits a watermark of the
  2. -- maximum observed timestamp so far. Rows that have a timestamp bigger to the max timestamp
  3. -- are not late.
  4. CREATE TABLE MyTable (
  5. ts_field TIMESTAMP(3),
  6. WATERMARK FOR ts_field AS ts_field
  7. ) WITH (
  8. ...
  9. )
  10. -- Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
  11. -- observed timestamp so far minus 1. Rows that have a timestamp bigger or equal to the max timestamp
  12. -- are not late.
  13. CREATE TABLE MyTable (
  14. ts_field TIMESTAMP(3),
  15. WATERMARK FOR ts_field AS ts_field - INTERVAL '0.001' SECOND
  16. ) WITH (
  17. ...
  18. )
  19. -- Sets a watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
  20. -- Emits watermarks which are the maximum observed timestamp minus the specified delay, e.g. 2 seconds.
  21. CREATE TABLE MyTable (
  22. ts_field TIMESTAMP(3),
  23. WATERMARK FOR ts_field AS ts_field - INTERVAL '2' SECOND
  24. ) WITH (
  25. ...
  26. )

Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.

SQL Types

Please see the Data Types page about how to declare a type in SQL.