Connectors
This page describes how to use connectors in PyFlink and highlights the details to be aware of when using Flink connectors in Python programs.
- Download connector and format jars
- How to use connectors
- Predefined Sources and Sinks
- User-defined sources & sinks
Note For general connector information and common configuration, please refer to the corresponding Java/Scala documentation.
Download connector and format jars
Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars that need to be specified as job dependencies.
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
How to use connectors
In PyFink’s Table API, DDL is the recommended way to define sources and sinks, executed via the execute_sql()
method on the TableEnvironment
. This makes the table available for use by the application.
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = """
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait()
Below is a complete example of how to use a Kafka source/sink and the JSON format in PyFlink.
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
# specify connector and format jars
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = """
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait()
if __name__ == '__main__':
log_processing()
Predefined Sources and Sinks
Some data sources and sinks are built into Flink and are available out-of-the-box. These predefined data sources include reading from Pandas DataFrame, or ingesting data from collections. The predefined data sinks support writing to Pandas DataFrame.
from/to Pandas
PyFlink Tables support conversion to and from Pandas DataFrame.
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
from_elements()
from_elements()
is used to create a table from a collection of elements. The element types must be acceptable atomic types or acceptable composite types.
table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
# use the second parameter to specify custom field names
table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
# use the second parameter to specify a custom table schema
table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.STRING())]))
The above query returns a Table like:
+----+-------+
| a | b |
+====+=======+
| 1 | Hi |
+----+-------+
| 2 | Hello |
+----+-------+
User-defined sources & sinks
In some cases, you may want to define custom sources and sinks. Currently, sources and sinks must be implemented in Java/Scala, but you can define a TableFactory
to support their use via DDL. More details can be found in the Java/Scala documentation.