Row-based Operations
This page describes how to use row-based operations in PyFlink Table API.
Map
Performs a map
operation with a python general scalar function or vectorized scalar function. The output will be flattened if the output type is a composite type.
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
def func1(id: int, data: str) -> Row:
return Row(id, data * 2)
# the input columns are specified as the inputs
table.map(func1(col('id'), col('data'))).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
It also supports to take a Row object (containing all the columns of the input table) as input.
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
def func2(data: Row) -> Row:
return Row(data.id, data.data * 2)
# specify the function without the input columns
table.map(func2).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
Note The input columns should not be specified when using func2 in the map operation.
It also supports to use vectorized scalar function in the map
operation. It should be noted that the input type and output type should be pandas.DataFrame instead of Row in this case.
import pandas as pd
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]),
func_type='pandas')
def func3(data: pd.DataFrame) -> pd.DataFrame:
res = pd.concat([data.id, data.data * 2], axis=1)
return res
table.map(func3).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
FlatMap
Performs a flat_map
operation with a python table function.
from pyflink.common import Row
from pyflink.table.udf import udtf
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x.data.split(","):
yield x.id, s
# use split in `flat_map`
table.flat_map(split).execute().print()
# result is
#+-------------+--------------------------------+
#| f0 | f1 |
#+-------------+--------------------------------+
#| 1 | Hi |
#| 1 | Flink |
#| 2 | Hello |
#+-------------+--------------------------------+
The python table function could also be used in join_lateral
and left_outer_join_lateral
.
# use table function in `join_lateral` or `left_outer_join_lateral`
table.join_lateral(split.alias('a', 'b')).execute().print()
# result is
#+----------------------+--------------------------------+-------------+--------------------------------+
#| id | data | a | b |
#+----------------------+--------------------------------+-------------+--------------------------------+
#| 1 | Hi,Flink | 1 | Hi |
#| 1 | Hi,Flink | 1 | Flink |
#| 2 | Hello | 2 | Hello |
#+----------------------+--------------------------------+-------------+--------------------------------+
Aggregate
Performs an aggregate
operation with a python general aggregate function or vectorized aggregate function.
from pyflink.common import Row
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import AggregateFunction, udaf
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
return Row(accumulator[0], accumulator[1])
def create_accumulator(self):
return Row(0, 0)
def accumulate(self, accumulator, row):
accumulator[0] += 1
accumulator[1] += row.b
def retract(self, accumulator, row):
accumulator[0] -= 1
accumulator[1] -= row.b
def merge(self, accumulator, accumulators):
for other_acc in accumulators:
accumulator[0] += other_acc[0]
accumulator[1] += other_acc[1]
def get_accumulator_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT())])
def get_result_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT())])
function = CountAndSumAggregateFunction()
agg = udaf(function,
result_type=function.get_result_type(),
accumulator_type=function.get_accumulator_type(),
name=str(function.__class__.__name__))
# aggregate with a python general aggregate function
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
result = t.group_by(col('a')) \
.aggregate(agg.alias("c", "d")) \
.select(col('a'), col('c'), col('d'))
result.execute().print()
# the result is
#+----+----------------------+----------------------+----------------------+
#| op | a | c | d |
#+----+----------------------+----------------------+----------------------+
#| +I | 1 | 2 | 5 |
#| +I | 2 | 1 | 1 |
#+----+----------------------+----------------------+----------------------+
# aggregate with a python vectorized aggregate function
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
.select(col('a'), col('b')).execute().print()
# the result is
#+--------------------------------+-------------+
#| a | b |
#+--------------------------------+-------------+
#| 2.0 | 3 |
#+--------------------------------+-------------+
Note Similar to map
operation, if you specify the aggregate function without the input columns in aggregate
operation, it will take Row or Pandas.DataFrame as input which contains all the columns of the input table including the grouping keys. Note You have to close the “aggregate” with a select statement and it should not contain aggregate functions in the select statement. Besides, the output of aggregate will be flattened if it is a composite type.
FlatAggregate
Performs a flat_aggregate
operation with a python general Table Aggregate Function
Similar to GroupBy Aggregation
, FlatAggregate
groups the inputs on the grouping keys. Different from AggregateFunction
, TableAggregateFunction
could return 0, 1, or more records for a grouping key. Similar to aggregate
, you have to close the flat_aggregate
with a select statement and the select statement should not contain aggregate functions.
from pyflink.common import Row
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.udf import udtaf, TableAggregateFunction
class Top2(TableAggregateFunction):
def emit_value(self, accumulator):
yield Row(accumulator[0])
yield Row(accumulator[1])
def create_accumulator(self):
return [None, None]
def accumulate(self, accumulator, row):
if row.a is not None:
if accumulator[0] is None or row.a > accumulator[0]:
accumulator[1] = accumulator[0]
accumulator[0] = row.a
elif accumulator[1] is None or row.a > accumulator[1]:
accumulator[1] = row.a
def get_accumulator_type(self):
return DataTypes.ARRAY(DataTypes.BIGINT())
def get_result_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT())])
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and accumulator type can also be specified in the udtaf decorator:
# top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
top2 = udtaf(Top2())
t = table_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
(2, 'Hi', 'Hello')],
['a', 'b', 'c'])
# call function "inline" without registration in Table API
result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).to_pandas()
# the result is:
#+----+--------------------------------+----------------------+
#| op | b | a |
#+----+--------------------------------+----------------------+
#| +I | Hi2 | 5 |
#| +I | Hi2 | <NULL> |
#| +I | Hi | 7 |
#| +I | Hi | 3 |
#+----+--------------------------------+----------------------+