- User-defined Sources & Sinks
User-defined Sources & Sinks
A TableSource
provides access to data which is stored in external systems (database, key-value store, message queue) or files. After a TableSource is registered in a TableEnvironment it can be accessed by Table API or SQL queries.
A TableSink
emits a Table to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Parquet, or ORC).
A TableFactory
allows for separating the declaration of a connection to an external system from the actual implementation. A table factory creates configured instances of table sources and sinks from normalized, string-based properties. The properties can be generated programmatically using a Descriptor
or via YAML configuration files for the SQL Client.
Have a look at the common concepts and API page for details how to register a TableSource and how to emit a Table through a TableSink. See the built-in sources, sinks, and formats page for examples how to use factories.
Define a TableSource
A TableSource
is a generic interface that gives Table API and SQL queries access to data stored in an external system. It provides the schema of the table and the records that are mapped to rows with the table’s schema. Depending on whether the TableSource
is used in a streaming or batch query, the records are produced as a DataSet
or DataStream
.
If a TableSource
is used in a streaming query it must implement the StreamTableSource
interface, if it is used in a batch query it must implement the BatchTableSource
interface. A TableSource
can also implement both interfaces and be used in streaming and batch queries.
StreamTableSource
and BatchTableSource
extend the base interface TableSource
that defines the following methods:
TableSource<T> {
public TableSchema getTableSchema();
public TypeInformation<T> getReturnType();
public String explainSource();
}
TableSource[T] {
def getTableSchema: TableSchema
def getReturnType: TypeInformation[T]
def explainSource: String
}
getTableSchema()
: Returns the schema of the table, i.e., the names and types of the fields of the table. The field types are defined using Flink’sTypeInformation
(see Table API types and SQL types).getReturnType()
: Returns the physical type of theDataStream
(StreamTableSource
) orDataSet
(BatchTableSource
) and the records that are produced by theTableSource
.explainSource()
: Returns a String that describes theTableSource
. This method is optional and used for display purposes only.
The TableSource
interface separates the logical table schema from the physical type of the returned DataStream
or DataSet
. As a consequence, all fields of the table schema (getTableSchema()
) must be mapped to a field with corresponding type of the physical return type (getReturnType()
). By default, this mapping is done based on field names. For example, a TableSource
that defines a table schema with two fields [name: String, size: Integer]
requires a TypeInformation
with at least two fields called name
and size
of type String
and Integer
, respectively. This could be a PojoTypeInfo
or a RowTypeInfo
that have two fields named name
and size
with matching types.
However, some types, such as Tuple or CaseClass types, do support custom field names. If a TableSource
returns a DataStream
or DataSet
of a type with fixed field names, it can implement the DefinedFieldMapping
interface to map field names from the table schema to field names of the physical return type.
Defining a BatchTableSource
The BatchTableSource
interface extends the TableSource
interface and defines one additional method:
BatchTableSource<T> implements TableSource<T> {
public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
BatchTableSource[T] extends TableSource[T] {
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
getDataSet(execEnv)
: Returns aDataSet
with the data of the table. The type of theDataSet
must be identical to the return type defined by theTableSource.getReturnType()
method. TheDataSet
can by created using a regular data source of the DataSet API. Commonly, aBatchTableSource
is implemented by wrapping aInputFormat
or batch connector.
Defining a StreamTableSource
The StreamTableSource
interface extends the TableSource
interface and defines one additional method:
StreamTableSource<T> implements TableSource<T> {
public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
StreamTableSource[T] extends TableSource[T] {
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
getDataStream(execEnv)
: Returns aDataStream
with the data of the table. The type of theDataStream
must be identical to the return type defined by theTableSource.getReturnType()
method. TheDataStream
can by created using a regular data source of the DataStream API. Commonly, aStreamTableSource
is implemented by wrapping aSourceFunction
or a stream connector.
Defining a TableSource with Time Attributes
Time-based operations of streaming Table API and SQL queries, such as windowed aggregations or joins, require explicitly specified time attributes.
A TableSource
defines a time attribute as a field of type Types.SQL_TIMESTAMP
in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a TableSource
defines a time attribute by implementing a certain interface.
Defining a Processing Time Attribute
Processing time attributes are commonly used in streaming queries. A processing time attribute returns the current wall-clock time of the operator that accesses it. A TableSource
defines a processing time attribute by implementing the DefinedProctimeAttribute
interface. The interface looks as follows:
DefinedProctimeAttribute {
public String getProctimeAttribute();
}
DefinedProctimeAttribute {
def getProctimeAttribute: String
}
getProctimeAttribute()
: Returns the name of the processing time attribute. The specified attribute must be defined of typeTypes.SQL_TIMESTAMP
in the table schema and can be used in time-based operations. ADefinedProctimeAttribute
table source can define no processing time attribute by returningnull
.
Attention Both StreamTableSource
and BatchTableSource
can implement DefinedProctimeAttribute
and define a processing time attribute. In case of a BatchTableSource
the processing time field is initialized with the current timestamp during the table scan.
Defining a Rowtime Attribute
Rowtime attributes are attributes of type TIMESTAMP
and handled in a unified way in stream and batch queries.
A table schema field of type SQL_TIMESTAMP
can be declared as rowtime attribute by specifying
- the name of the field,
- a
TimestampExtractor
that computes the actual value for the attribute (usually from one or more other fields), and - a
WatermarkStrategy
that specifies how watermarks are generated for the the rowtime attribute.
A TableSource
defines a rowtime attribute by implementing the DefinedRowtimeAttributes
interface. The interface looks as follows:
DefinedRowtimeAttribute {
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
DefinedRowtimeAttributes {
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
getRowtimeAttributeDescriptors()
: Returns a list ofRowtimeAttributeDescriptor
. ARowtimeAttributeDescriptor
describes a rowtime attribute with the following properties:attributeName
: The name of the rowtime attribute in the table schema. The field must be defined with typeTypes.SQL_TIMESTAMP
.timestampExtractor
: The timestamp extractor extracts the timestamp from a record with the return type. For example, it can convert a Long field into a timestamp or parse a String-encoded timestamp. Flink comes with a set of built-inTimestampExtractor
implementation for common use cases. It is also possible to provide a custom implementation.watermarkStrategy
: The watermark strategy defines how watermarks are generated for the rowtime attribute. Flink comes with a set of built-inWatermarkStrategy
implementations for common use cases. It is also possible to provide a custom implementation.
Attention Although the getRowtimeAttributeDescriptors()
method returns a list of descriptors, only a single rowtime attribute is support at the moment. We plan to remove this restriction in the future and support tables with more than one rowtime attribute.
Attention Both, StreamTableSource
and BatchTableSource
, can implement DefinedRowtimeAttributes
and define a rowtime attribute. In either case, the rowtime field is extracted using the TimestampExtractor
. Hence, a TableSource
that implements StreamTableSource
and BatchTableSource
and defines a rowtime attribute provides exactly the same data to streaming and batch queries.
Provided Timestamp Extractors
Flink provides TimestampExtractor
implementations for common use cases.
The following TimestampExtractor
implementations are currently available:
ExistingField(fieldName)
: Extracts the value of a rowtime attribute from an existingLONG
,SQL_TIMESTAMP
, or timestamp formattedSTRING
field. One example of such a string would be ‘2018-05-28 12:34:56.000’.StreamRecordTimestamp()
: Extracts the value of a rowtime attribute from the timestamp of theDataStream
StreamRecord
. Note, thisTimestampExtractor
is not available for batch table sources.
A custom TimestampExtractor
can be defined by implementing the corresponding interface.
Provided Watermark Strategies
Flink provides WatermarkStrategy
implementations for common use cases.
The following WatermarkStrategy
implementations are currently available:
AscendingTimestamps
: A watermark strategy for ascending timestamps. Records with timestamps that are out-of-order will be considered late.BoundedOutOfOrderTimestamps(delay)
: A watermark strategy for timestamps that are at most out-of-order by the specified delay.PreserveWatermarks()
: A strategy which indicates the watermarks should be preserved from the underlyingDataStream
.
A custom WatermarkStrategy
can be defined by implementing the corresponding interface.
Defining a TableSource with Projection Push-Down
A TableSource
supports projection push-down by implementing the ProjectableTableSource
interface. The interface defines a single method:
ProjectableTableSource<T> {
public TableSource<T> projectFields(int[] fields);
}
ProjectableTableSource[T] {
def projectFields(fields: Array[Int]): TableSource[T]
}
projectFields(fields)
: Returns a copy of theTableSource
with adjusted physical return type. Thefields
parameter provides the indexes of the fields that must be provided by theTableSource
. The indexes relate to theTypeInformation
of the physical return type, not to the logical table schema. The copiedTableSource
must adjust its return type and the returnedDataStream
orDataSet
. TheTableSchema
of the copiedTableSource
must not be changed, i.e, it must be the same as the originalTableSource
. If theTableSource
implements theDefinedFieldMapping
interface, the field mapping must be adjusted to the new return type.
Attention In order for Flink to distinguish a projection push-down table source from its original form, explainSource
method must be override to include information regarding the projected fields.
The ProjectableTableSource
adds support to project flat fields. If the TableSource
defines a table with nested schema, it can implement the NestedFieldsProjectableTableSource
to extend the projection to nested fields. The NestedFieldsProjectableTableSource
is defined as follows:
NestedFieldsProjectableTableSource<T> {
public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
NestedFieldsProjectableTableSource[T] {
def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
projectNestedField(fields, nestedFields)
: Returns a copy of theTableSource
with adjusted physical return type. Fields of the physical return type may be removed or reordered but their type must not be changed. The contract of this method is essentially the same as for theProjectableTableSource.projectFields()
method. In addition, thenestedFields
parameter contains for each field index in thefields
list, a list of paths to all nested fields that are accessed by the query. All other nested fields do not need to be read, parsed, and set in the records that are produced by theTableSource
.
Attention the types of the projected fields must not be changed but unused fields may be set to null or to a default value.
Defining a TableSource with Filter Push-Down
The FilterableTableSource
interface adds support for filter push-down to a TableSource
. A TableSource
extending this interface is able to filter records such that the returned DataStream
or DataSet
returns fewer records.
The interface looks as follows:
FilterableTableSource<T> {
public TableSource<T> applyPredicate(List<Expression> predicates);
public boolean isFilterPushedDown();
}
FilterableTableSource[T] {
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
def isFilterPushedDown: Boolean
}
applyPredicate(predicates)
: Returns a copy of theTableSource
with added predicates. Thepredicates
parameter is a mutable list of conjunctive predicates that are “offered” to theTableSource
. TheTableSource
accepts to evaluate a predicate by removing it from the list. Predicates that are left in the list will be evaluated by a subsequent filter operator.isFilterPushedDown()
: Returns true if theapplyPredicate()
method was called before. Hence,isFilterPushedDown()
must return true for allTableSource
instances returned from aapplyPredicate()
call.
Attention In order for Flink to distinguish a filter push-down table source from its original form, explainSource
method must be override to include information regarding the push-down filters.
Defining a TableSource for Lookups
Attention This is an experimental feature. The interface may be changed in future versions. It’s only supported in Blink planner.
The LookupableTableSource
interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the TableSource
in lookup mode, you should use the source in temporal table join syntax.
The interface looks as follows:
LookupableTableSource<T> implements TableSource<T> {
public TableFunction<T> getLookupFunction(String[] lookupkeys);
public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
public boolean isAsyncEnabled();
}
LookupableTableSource[T] extends TableSource[T] {
def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
def isAsyncEnabled: Boolean
}
getLookupFunction(lookupkeys)
: Returns aTableFunction
which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names ofLookupableTableSource
in the join equal conditions. The eval method parameters of the returnedTableFunction
’s should be in the order whichlookupkeys
defined. It is recommended to define the parameters in varargs (e.g.eval(Object… lookupkeys)
to match all the cases). The return type of theTableFunction
must be identical to the return type defined by theTableSource.getReturnType()
method.getAsyncLookupFunction(lookupkeys)
: Optional. Similar togetLookupFunction
, but theAsyncLookupFunction
lookups the matched row(s) asynchronously. The underlying ofAsyncLookupFunction
will be called via Async I/O. The first argument of the eval method of the returnedAsyncTableFunction
should be defined asjava.util.concurrent.CompletableFuture
to collect results asynchronously (e.g.eval(CompletableFuture<Collection<String>> result, Object… lookupkeys)
). The implementation of this method can throw an exception if the TableSource doesn’t support asynchronously lookup.isAsyncEnabled()
: Returns true if async lookup is enabled. It requiresgetAsyncLookupFunction(lookupkeys)
is implemented ifisAsyncEnabled
returns true.
Define a TableSink
A TableSink
specifies how to emit a Table
to an external system or location. The interface is generic such that it can support different storage locations and formats. There are different table sinks for batch tables and streaming tables.
The general interface looks as follows:
TableSink<T> {
public TypeInformation<T> getOutputType();
public String[] getFieldNames();
public TypeInformation[] getFieldTypes();
public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
TableSink[T] {
def getOutputType: TypeInformation<T>
def getFieldNames: Array[String]
def getFieldTypes: Array[TypeInformation]
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}
The TableSink#configure
method is called to pass the schema of the Table (field names and types) to emit to the TableSink
. The method must return a new instance of the TableSink which is configured to emit the provided Table schema.
BatchTableSink
Defines an external TableSink
to emit a batch table.
The interface looks as follows:
BatchTableSink<T> implements TableSink<T> {
public void emitDataSet(DataSet<T> dataSet);
}
BatchTableSink[T] extends TableSink[T] {
def emitDataSet(dataSet: DataSet[T]): Unit
}
AppendStreamTableSink
Defines an external TableSink
to emit a streaming table with only insert changes.
The interface looks as follows:
AppendStreamTableSink<T> implements TableSink<T> {
public void emitDataStream(DataStream<T> dataStream);
}
AppendStreamTableSink[T] extends TableSink[T] {
def emitDataStream(dataStream: DataStream[T]): Unit
}
If the table is also modified by update or delete changes, a TableException
will be thrown.
RetractStreamTableSink
Defines an external TableSink
to emit a streaming table with insert, update, and delete changes.
The interface looks as follows:
RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
The table will be converted into a stream of accumulate and retraction messages which are encoded as Java Tuple2
. The first field is a boolean flag to indicate the message type (true
indicates insert, false
indicates delete). The second field holds the record of the requested type T
.
UpsertStreamTableSink
Defines an external TableSink
to emit a streaming table with insert, update, and delete changes.
The interface looks as follows:
UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
public void setKeyFields(String[] keys);
public void setIsAppendOnly(boolean isAppendOnly);
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def setKeyFields(keys: Array[String]): Unit
def setIsAppendOnly(isAppendOnly: Boolean): Unit
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
The table must be have unique key fields (atomic or composite) or be append-only. If the table does not have a unique key and is not append-only, a TableException
will be thrown. The unique key of the table is configured by the UpsertStreamTableSink#setKeyFields()
method.
The table will be converted into a stream of upsert and delete messages which are encoded as a Java Tuple2
. The first field is a boolean flag to indicate the message type. The second field holds the record of the requested type T
.
A message with true boolean field is an upsert message for the configured key. A message with false flag is a delete message for the configured key. If the table is append-only, all messages will have a true flag and must be interpreted as insertions.
Define a TableFactory
A TableFactory
allows to create different table-related instances from string-based properties. All available factories are called for matching to the given set of properties and a corresponding factory class.
Factories leverage Java’s Service Provider Interfaces (SPI) for discovering. This means that every dependency and JAR file should contain a file org.apache.flink.table.factories.TableFactory
in the META_INF/services
resource directory that lists all available table factories that it provides.
Every table factory needs to implement the following interface:
package org.apache.flink.table.factories;
interface TableFactory {
Map<String, String> requiredContext();
List<String> supportedProperties();
}
package org.apache.flink.table.factories
trait TableFactory {
def requiredContext(): util.Map[String, String]
def supportedProperties(): util.List[String]
}
requiredContext()
: Specifies the context that this factory has been implemented for. The framework guarantees to only match for this factory if the specified set of properties and values are met. Typical properties might beconnector.type
,format.type
, orupdate-mode
. Property keys such asconnector.property-version
andformat.property-version
are reserved for future backwards compatibility cases.supportedProperties
: List of property keys that this factory can handle. This method will be used for validation. If a property is passed that this factory cannot handle, an exception will be thrown. The list must not contain the keys that are specified by the context.
In order to create a specific instance, a factory class can implement one or more interfaces provided in org.apache.flink.table.factories
:
BatchTableSourceFactory
: Creates a batch table source.BatchTableSinkFactory
: Creates a batch table sink.StreamTableSourceFactory
: Creates a stream table source.StreamTableSinkFactory
: Creates a stream table sink.DeserializationSchemaFactory
: Creates a deserialization schema format.SerializationSchemaFactory
: Creates a serialization schema format.
The discovery of a factory happens in multiple stages:
- Discover all available factories.
- Filter by factory class (e.g.,
StreamTableSourceFactory
). - Filter by matching context.
- Filter by supported properties.
- Verify that exactly one factory matches, otherwise throw an
AmbiguousTableFactoryException
orNoMatchingTableFactoryException
.
The following example shows how to provide a custom streaming source with an additional connector.debug
property flag for parameterization.
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put("update-mode", "append");
context.put("connector.type", "my-system");
return context;
}
@Override
public List<String> supportedProperties() {
List<String> list = new ArrayList<>();
list.add("connector.debug");
return list;
}
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
# additional validation of the passed properties can also happen here
return new MySystemAppendTableSource(isDebug);
}
}
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row
class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
context.put("update-mode", "append")
context.put("connector.type", "my-system")
context
}
override def supportedProperties(): util.List[String] = {
val properties = new util.ArrayList[String]()
properties.add("connector.debug")
properties
}
override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))
# additional validation of the passed properties can also happen here
new MySystemAppendTableSource(isDebug)
}
}
Use a TableFactory in the SQL Client
In a SQL Client environment file, the previously presented factory could be declared as:
tables:
- name: MySystemTable
type: source
update-mode: append
connector:
type: my-system
debug: true
The YAML file is translated into flattened string properties and a table factory is called with those properties that describe the connection to the external system:
update-mode=append
connector.type=my-system
connector.debug=true
Attention Properties such as tables.#.name
or tables.#.type
are SQL Client specifics and are not passed to any factory. The type
property decides, depending on the execution environment, whether a BatchTableSourceFactory
/StreamTableSourceFactory
(for source
), a BatchTableSinkFactory
/StreamTableSinkFactory
(for sink
), or both (for both
) need to discovered.
Use a TableFactory in the Table & SQL API
For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in org.apache.flink.table.descriptors
that translate into string-based properties. See the built-in descriptors for sources, sinks, and formats as a reference.
A connector for MySystem
in our example can extend ConnectorDescriptor
as shown below:
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.HashMap;
import java.util.Map;
/**
* Connector to MySystem with debug mode.
*/
public class MySystemConnector extends ConnectorDescriptor {
public final boolean isDebug;
public MySystemConnector(boolean isDebug) {
super("my-system", 1, false);
this.isDebug = isDebug;
}
@Override
protected Map<String, String> toConnectorProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("connector.debug", Boolean.toString(isDebug));
return properties;
}
}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
import java.util.Map
/**
* Connector to MySystem with debug mode.
*/
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {
override protected def toConnectorProperties(): Map[String, String] = {
val properties = new HashMap[String, String]
properties.put("connector.debug", isDebug.toString)
properties
}
}
The descriptor can then be used in the API as follows:
StreamTableEnvironment tableEnv = // ...
tableEnv
.connect(new MySystemConnector(true))
.inAppendMode()
.registerTableSource("MySystemTable");
val tableEnv: StreamTableEnvironment = // ...
tableEnv
.connect(new MySystemConnector(isDebug = true))
.inAppendMode()
.registerTableSource("MySystemTable")