Row-based Operations

This page describes how to use row-based operations in PyFlink Table API.

Map

Performs a map operation with a python general scalar function or vectorized scalar function. The output will be flattened if the output type is a composite type.

  1. from pyflink.common import Row
  2. from pyflink.table import EnvironmentSettings, TableEnvironment
  3. from pyflink.table.expressions import col
  4. from pyflink.table.udf import udf
  5. env_settings = EnvironmentSettings.in_batch_mode()
  6. table_env = TableEnvironment.create(env_settings)
  7. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  8. @udf(result_type='ROW<id BIGINT, data STRING>')
  9. def func1(id: int, data: str) -> Row:
  10. return Row(id, data * 2)
  11. # the input columns are specified as the inputs
  12. table.map(func1(col('id'), col('data'))).execute().print()
  13. # result is
  14. #+----------------------+--------------------------------+
  15. #| id | data |
  16. #+----------------------+--------------------------------+
  17. #| 1 | HiHi |
  18. #| 2 | HelloHello |
  19. #+----------------------+--------------------------------+

It also supports to take a Row object (containing all the columns of the input table) as input.

  1. @udf(result_type='ROW<id BIGINT, data STRING>')
  2. def func2(data: Row) -> Row:
  3. return Row(data.id, data.data * 2)
  4. # specify the function without the input columns
  5. table.map(func2).execute().print()
  6. # result is
  7. #+----------------------+--------------------------------+
  8. #| id | data |
  9. #+----------------------+--------------------------------+
  10. #| 1 | HiHi |
  11. #| 2 | HelloHello |
  12. #+----------------------+--------------------------------+

Note The input columns should not be specified when using func2 in the map operation.

It also supports to use vectorized scalar function in the map operation. It should be noted that the input type and output type should be pandas.DataFrame instead of Row in this case.

  1. import pandas as pd
  2. @udf(result_type='ROW<id BIGINT, data STRING>', func_type='pandas')
  3. def func3(data: pd.DataFrame) -> pd.DataFrame:
  4. res = pd.concat([data.id, data.data * 2], axis=1)
  5. return res
  6. table.map(func3).execute().print()
  7. # result is
  8. #+----------------------+--------------------------------+
  9. #| id | data |
  10. #+----------------------+--------------------------------+
  11. #| 1 | HiHi |
  12. #| 2 | HelloHello |
  13. #+----------------------+--------------------------------+

FlatMap

Performs a flat_map operation with a python table function.

  1. from pyflink.common import Row
  2. from pyflink.table.udf import udtf
  3. from pyflink.table import EnvironmentSettings, TableEnvironment
  4. env_settings = EnvironmentSettings.in_batch_mode()
  5. table_env = TableEnvironment.create(env_settings)
  6. table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
  7. @udtf(result_types=['INT', 'STRING'])
  8. def split(x: Row) -> Row:
  9. for s in x.data.split(","):
  10. yield x.id, s
  11. # use split in `flat_map`
  12. table.flat_map(split).execute().print()
  13. # result is
  14. #+-------------+--------------------------------+
  15. #| f0 | f1 |
  16. #+-------------+--------------------------------+
  17. #| 1 | Hi |
  18. #| 1 | Flink |
  19. #| 2 | Hello |
  20. #+-------------+--------------------------------+

The python table function could also be used in join_lateral and left_outer_join_lateral.

  1. # use table function in `join_lateral` or `left_outer_join_lateral`
  2. table.join_lateral(split.alias('a', 'b')).execute().print()
  3. # result is
  4. #+----------------------+--------------------------------+-------------+--------------------------------+
  5. #| id | data | a | b |
  6. #+----------------------+--------------------------------+-------------+--------------------------------+
  7. #| 1 | Hi,Flink | 1 | Hi |
  8. #| 1 | Hi,Flink | 1 | Flink |
  9. #| 2 | Hello | 2 | Hello |
  10. #+----------------------+--------------------------------+-------------+--------------------------------+

Aggregate

Performs an aggregate operation with a python general aggregate function or vectorized aggregate function.

  1. from pyflink.common import Row
  2. from pyflink.table import EnvironmentSettings, TableEnvironment
  3. from pyflink.table.expressions import col
  4. from pyflink.table.udf import AggregateFunction, udaf
  5. class CountAndSumAggregateFunction(AggregateFunction):
  6. def get_value(self, accumulator):
  7. return Row(accumulator[0], accumulator[1])
  8. def create_accumulator(self):
  9. return Row(0, 0)
  10. def accumulate(self, accumulator, row):
  11. accumulator[0] += 1
  12. accumulator[1] += row.b
  13. def retract(self, accumulator, row):
  14. accumulator[0] -= 1
  15. accumulator[1] -= row.b
  16. def merge(self, accumulator, accumulators):
  17. for other_acc in accumulators:
  18. accumulator[0] += other_acc[0]
  19. accumulator[1] += other_acc[1]
  20. def get_accumulator_type(self):
  21. return 'ROW<a BIGINT, b BIGINT>'
  22. def get_result_type(self):
  23. return 'ROW<a BIGINT, b BIGINT>'
  24. function = CountAndSumAggregateFunction()
  25. agg = udaf(function,
  26. result_type=function.get_result_type(),
  27. accumulator_type=function.get_accumulator_type(),
  28. name=str(function.__class__.__name__))
  29. # aggregate with a python general aggregate function
  30. env_settings = EnvironmentSettings.in_streaming_mode()
  31. table_env = TableEnvironment.create(env_settings)
  32. t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
  33. result = t.group_by(col('a')) \
  34. .aggregate(agg.alias("c", "d")) \
  35. .select(col('a'), col('c'), col('d'))
  36. result.execute().print()
  37. # the result is
  38. #+----+----------------------+----------------------+----------------------+
  39. #| op | a | c | d |
  40. #+----+----------------------+----------------------+----------------------+
  41. #| +I | 1 | 2 | 5 |
  42. #| +I | 2 | 1 | 1 |
  43. #+----+----------------------+----------------------+----------------------+
  44. # aggregate with a python vectorized aggregate function
  45. env_settings = EnvironmentSettings.in_batch_mode()
  46. table_env = TableEnvironment.create(env_settings)
  47. t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
  48. pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
  49. result_type='ROW<a FLOAT, b INT>',
  50. func_type="pandas")
  51. t.aggregate(pandas_udaf.alias("a", "b")) \
  52. .select(col('a'), col('b')).execute().print()
  53. # the result is
  54. #+--------------------------------+-------------+
  55. #| a | b |
  56. #+--------------------------------+-------------+
  57. #| 2.0 | 3 |
  58. #+--------------------------------+-------------+

Note Similar to map operation, if you specify the aggregate function without the input columns in aggregate operation, it will take Row or Pandas.DataFrame as input which contains all the columns of the input table including the grouping keys. Note You have to close the “aggregate” with a select statement and it should not contain aggregate functions in the select statement. Besides, the output of aggregate will be flattened if it is a composite type.

FlatAggregate

Performs a flat_aggregate operation with a python general Table Aggregate Function

Similar to GroupBy Aggregation, FlatAggregate groups the inputs on the grouping keys. Different from AggregateFunction, TableAggregateFunction could return 0, 1, or more records for a grouping key. Similar to aggregate, you have to close the flat_aggregate with a select statement and the select statement should not contain aggregate functions.

  1. from pyflink.common import Row
  2. from pyflink.table import TableEnvironment, EnvironmentSettings
  3. from pyflink.table.expressions import col
  4. from pyflink.table.udf import udtaf, TableAggregateFunction
  5. class Top2(TableAggregateFunction):
  6. def emit_value(self, accumulator):
  7. yield Row(accumulator[0])
  8. yield Row(accumulator[1])
  9. def create_accumulator(self):
  10. return [None, None]
  11. def accumulate(self, accumulator, row):
  12. if row.a is not None:
  13. if accumulator[0] is None or row.a > accumulator[0]:
  14. accumulator[1] = accumulator[0]
  15. accumulator[0] = row.a
  16. elif accumulator[1] is None or row.a > accumulator[1]:
  17. accumulator[1] = row.a
  18. def get_accumulator_type(self):
  19. return 'ARRAY<BIGINT>'
  20. def get_result_type(self):
  21. return 'ROW<a BIGINT>'
  22. env_settings = EnvironmentSettings.in_streaming_mode()
  23. table_env = TableEnvironment.create(env_settings)
  24. # the result type and accumulator type can also be specified in the udtaf decorator:
  25. # top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
  26. top2 = udtaf(Top2())
  27. t = table_env.from_elements([(1, 'Hi', 'Hello'),
  28. (3, 'Hi', 'hi'),
  29. (5, 'Hi2', 'hi'),
  30. (7, 'Hi', 'Hello'),
  31. (2, 'Hi', 'Hello')],
  32. ['a', 'b', 'c'])
  33. # call function "inline" without registration in Table API
  34. result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
  35. # the result is:
  36. #+----+--------------------------------+----------------------+
  37. #| op | b | a |
  38. #+----+--------------------------------+----------------------+
  39. #| +I | Hi2 | 5 |
  40. #| +I | Hi2 | <NULL> |
  41. #| +I | Hi | 7 |
  42. #| +I | Hi | 3 |
  43. #+----+--------------------------------+----------------------+