Python Table API 简介

本文档是对 PyFlink Table API 的简要介绍,用于帮助新手用户快速理解 PyFlink Table API 的基本用法。 关于高级用法,请参阅用户指南中的其他文档。

Python Table API 程序的基本结构

所有的 Table API 和 SQL 程序,不管批模式,还是流模式,都遵循相同的结构。下面代码示例展示了 Table API 和 SQL 程序的基本结构。

  1. from pyflink.table import EnvironmentSettings, StreamTableEnvironment
  2. # 1. 创建 TableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. # 2. 创建 source 表
  6. table_env.execute_sql("""
  7. CREATE TABLE datagen (
  8. id INT,
  9. data STRING
  10. ) WITH (
  11. 'connector' = 'datagen',
  12. 'fields.id.kind' = 'sequence',
  13. 'fields.id.start' = '1',
  14. 'fields.id.end' = '10'
  15. )
  16. """)
  17. # 3. 创建 sink 表
  18. table_env.execute_sql("""
  19. CREATE TABLE print (
  20. id INT,
  21. data STRING
  22. ) WITH (
  23. 'connector' = 'print'
  24. )
  25. """)
  26. # 4. 查询 source 表,同时执行计算
  27. # 通过 Table API 创建一张表:
  28. source_table = table_env.from_path("datagen")
  29. # 或者通过 SQL 查询语句创建一张表:
  30. source_table = table_env.sql_query("SELECT * FROM datagen")
  31. result_table = source_table.select(source_table.id + 1, source_table.data)
  32. # 5. 将计算结果写入给 sink 表
  33. # 将 Table API 结果表数据写入 sink 表:
  34. result_table.execute_insert("print").wait()
  35. # 或者通过 SQL 查询语句来写入 sink 表:
  36. table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 集成的核心概念。下面代码示例展示了如何创建一个 TableEnvironment:

  1. from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
  2. # 创建 blink 流 TableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. # 创建 blink 批 TableEnvironment
  6. env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
  7. table_env = BatchTableEnvironment.create(environment_settings=env_settings)
  8. # 创建 flink 流 TableEnvironment
  9. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
  10. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  11. # 创建 flink 批 TableEnvironment
  12. env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_old_planner().build()
  13. table_env = BatchTableEnvironment.create(environment_settings=env_settings)

关于创建 TableEnvironment 的更多细节,请查阅 TableEnvironment 文档

TableEnvironment 可以用来:

目前有2个可用的执行器 : flink 执行器 和 blink 执行器。

你应该在当前程序中显式地设置使用哪个执行器,建议尽可能使用 blink 执行器。

创建表

Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。

一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。

通过列表类型的对象创建

你可以使用一个列表对象创建一张表:

  1. # 创建 blink 批 TableEnvironment
  2. from pyflink.table import EnvironmentSettings, BatchTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
  4. table_env = BatchTableEnvironment.create(environment_settings=env_settings)
  5. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
  6. table.to_pandas()

结果为:

  1. _1 _2
  2. 0 1 Hi
  3. 1 2 Hello

你也可以创建具有指定列名的表:

  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. table.to_pandas()

结果为:

  1. id data
  2. 0 1 Hi
  3. 1 2 Hello

默认情况下,表结构是从数据中自动提取的。

如果自动生成的表模式不符合你的要求,你也可以手动指定:

  1. table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. # 默认情况下,“id” 列的类型是 64 位整型
  3. default_type = table_without_schema.to_pandas()["id"].dtype
  4. print('By default the type of the "id" column is %s.' % default_type)
  5. from pyflink.table import DataTypes
  6. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
  7. DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
  8. DataTypes.FIELD("data", DataTypes.STRING())]))
  9. # 现在 “id” 列的类型是 8 位整型
  10. type = table.to_pandas()["id"].dtype
  11. print('Now the type of the "id" column is %s.' % type)

结果为:

  1. 默认情况下,“id 列的类型是 64 位整型。
  2. 现在 id 列的类型是 8 位整型。

通过 DDL 创建

你可以通过 DDL 创建一张表:

  1. # 创建 blink 流 TableEnvironment
  2. from pyflink.table import EnvironmentSettings, StreamTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. table_env.execute_sql("""
  6. CREATE TABLE random_source (
  7. id BIGINT,
  8. data TINYINT
  9. ) WITH (
  10. 'connector' = 'datagen',
  11. 'fields.id.kind'='sequence',
  12. 'fields.id.start'='1',
  13. 'fields.id.end'='3',
  14. 'fields.data.kind'='sequence',
  15. 'fields.data.start'='4',
  16. 'fields.data.end'='6'
  17. )
  18. """)
  19. table = table_env.from_path("random_source")
  20. table.to_pandas()

结果为:

  1. id data
  2. 0 2 5
  3. 1 1 4
  4. 2 3 6

通过 Catalog 创建

TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。

Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。

通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …“,都存储在 catalog 中。

你可以通过 SQL 直接访问 catalog 中的表。

如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

  1. # 准备 catalog
  2. # 将 Table API 表注册到 catalog 中
  3. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  4. table_env.create_temporary_view('source_table', table)
  5. # 从 catalog 中获取 Table API 表
  6. new_table = table_env.from_path('source_table')
  7. new_table.to_pandas()

结果为:

  1. id data
  2. 0 1 Hi
  3. 1 2 Hello

查询

Table API 查询

Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)

Table API 文档描述了流和批处理上所有支持的 Table API 操作。

以下示例展示了一个简单的 Table API 聚合查询:

  1. # 通过 batch table environment 来执行查询
  2. from pyflink.table import EnvironmentSettings, BatchTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
  4. table_env = BatchTableEnvironment.create(environment_settings=env_settings)
  5. orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
  6. ['name', 'country', 'revenue'])
  7. # 计算所有来自法国客户的收入
  8. revenue = orders \
  9. .select(orders.name, orders.country, orders.revenue) \
  10. .where(orders.country == 'FRANCE') \
  11. .group_by(orders.name) \
  12. .select(orders.name, orders.revenue.sum.alias('rev_sum'))
  13. revenue.to_pandas()

结果为:

  1. name rev_sum
  2. 0 Jack 30

SQL 查询

Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。

SQL 文档描述了 Flink 对流和批处理所支持的 SQL。

下面示例展示了一个简单的 SQL 聚合查询:

  1. # 通过 StreamTableEnvironment 来执行查询
  2. from pyflink.table import EnvironmentSettings, StreamTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. table_env.execute_sql("""
  6. CREATE TABLE random_source (
  7. id BIGINT,
  8. data TINYINT
  9. ) WITH (
  10. 'connector' = 'datagen',
  11. 'fields.id.kind'='sequence',
  12. 'fields.id.start'='1',
  13. 'fields.id.end'='8',
  14. 'fields.data.kind'='sequence',
  15. 'fields.data.start'='4',
  16. 'fields.data.end'='11'
  17. )
  18. """)
  19. table_env.execute_sql("""
  20. CREATE TABLE print_sink (
  21. id BIGINT,
  22. data_sum TINYINT
  23. ) WITH (
  24. 'connector' = 'print'
  25. )
  26. """)
  27. table_env.execute_sql("""
  28. INSERT INTO print_sink
  29. SELECT id, sum(data) as data_sum FROM
  30. (SELECT id / 2 as id, data FROM random_source)
  31. WHERE id > 1
  32. GROUP BY id
  33. """).wait()

结果为:

  1. 2> +I(4,11)
  2. 6> +I(2,8)
  3. 8> +I(3,10)
  4. 6> -U(2,8)
  5. 8> -U(3,10)
  6. 6> +U(2,15)
  7. 8> +U(3,19)

实际上,上述输出展示了 print 结果表所接收到的 change log。 change log 的格式为:

  1. {subtask id}> {消息类型}{值的字符串格式}

例如,”2> +I(4,11)” 表示这条消息来自第二个 subtask,其中 “+I” 表示这是一条插入的消息,”(4, 11)” 是这条消息的内容。 另外,”-U” 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。 “+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。

所以,从上面的 change log,我们可以得到如下结果:

  1. (4, 11)
  2. (2, 15)
  3. (3, 19)

Table API 和 SQL 的混合使用

Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。

下面例子展示了如何在 SQL 中使用 Table 对象:

  1. # 创建一张 sink 表来接收结果数据
  2. table_env.execute_sql("""
  3. CREATE TABLE table_sink (
  4. id BIGINT,
  5. data VARCHAR
  6. ) WITH (
  7. 'connector' = 'print'
  8. )
  9. """)
  10. # 将 Table API 表转换成 SQL 中的视图
  11. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  12. table_env.create_temporary_view('table_api_table', table)
  13. # 将 Table API 表的数据写入结果表
  14. table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

结果为:

  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)

下面例子展示了如何在 Table API 中使用 SQL 表:

  1. # 创建一张 SQL source 表
  2. table_env.execute_sql("""
  3. CREATE TABLE sql_source (
  4. id BIGINT,
  5. data TINYINT
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. 'fields.id.kind'='sequence',
  9. 'fields.id.start'='1',
  10. 'fields.id.end'='4',
  11. 'fields.data.kind'='sequence',
  12. 'fields.data.start'='4',
  13. 'fields.data.end'='7'
  14. )
  15. """)
  16. # 将 SQL 表转换成 Table API 表
  17. table = table_env.from_path("sql_source")
  18. # 或者通过 SQL 查询语句创建表
  19. table = table_env.sql_query("SELECT * FROM sql_source")
  20. # 将表中的数据写出
  21. table.to_pandas()

结果为:

  1. id data
  2. 0 2 5
  3. 1 1 4
  4. 2 4 7
  5. 3 3 6

将结果写出

将结果数据收集到客户端

你可以调用 “to_pandas” 方法来 将一个 Table 对象转化成 pandas DataFrame:

  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. table.to_pandas()

结果为:

  1. id data
  2. 0 1 Hi
  3. 1 2 Hello

Note “to_pandas” 会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 Table.limit 来限制收集数据的条数是一种很好的做法。 Note flink planner 不支持 “to_pandas”,并且,并不是所有的数据类型都可以转换为 pandas DataFrames。

将结果写入到一张 Sink 表中

你可以调用 “execute_insert” 方法来将 Table 对象中的数据写入到一张 sink 表中:

  1. table_env.execute_sql("""
  2. CREATE TABLE sink_table (
  3. id BIGINT,
  4. data VARCHAR
  5. ) WITH (
  6. 'connector' = 'print'
  7. )
  8. """)
  9. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  10. table.execute_insert("sink_table").wait()

结果为:

  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)

也可以通过 SQL 来完成:

  1. table_env.create_temporary_view("table_source", table)
  2. table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

将结果写入多张 Sink 表中

你也可以使用 StatementSet 在一个作业中将 Table 中的数据写入到多张 sink 表中:

  1. # 准备 source 表和 sink 表
  2. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  3. table_env.create_temporary_view("simple_source", table)
  4. table_env.execute_sql("""
  5. CREATE TABLE first_sink_table (
  6. id BIGINT,
  7. data VARCHAR
  8. ) WITH (
  9. 'connector' = 'print'
  10. )
  11. """)
  12. table_env.execute_sql("""
  13. CREATE TABLE second_sink_table (
  14. id BIGINT,
  15. data VARCHAR
  16. ) WITH (
  17. 'connector' = 'print'
  18. )
  19. """)
  20. # 创建 statement set
  21. statement_set = table_env.create_statement_set()
  22. # 将 "table" 的数据写入 "first_sink_table"
  23. statement_set.add_insert("first_sink_table", table)
  24. # 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"
  25. statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
  26. # 执行 statement set
  27. statement_set.execute().wait()

结果为:

  1. 7> +I(1,Hi)
  2. 7> +I(1,Hi)
  3. 7> +I(2,Hello)
  4. 7> +I(2,Hello)

Explain 表

Table API 提供了一种机制来查看 Table 的逻辑查询计划和优化后的查询计划。 这是通过 Table.explain() 或者 StatementSet.explain() 方法来完成的。Table.explain() 可以返回一个 Table 的执行计划。StatementSet.explain() 则可以返回含有多个 sink 的作业的执行计划。这些方法会返回一个字符串,字符串描述了以下三个方面的信息:

  1. 关系查询的抽象语法树,即未经优化的逻辑查询计划,
  2. 优化后的逻辑查询计划,
  3. 物理执行计划。

TableEnvironment.explain_sql()TableEnvironment.execute_sql() 支持执行 EXPLAIN 语句获得执行计划。更多细节请查阅 EXPLAIN

以下代码展示了如何使用 Table.explain() 方法:

  1. # 使用 StreamTableEnvironment
  2. from pyflink.table import EnvironmentSettings, StreamTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  6. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  7. table = table1 \
  8. .where(table1.data.like('H%')) \
  9. .union_all(table2)
  10. print(table.explain())

结果为:

  1. == 抽象语法树 ==
  2. LogicalUnion(all=[true])
  3. :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
  4. : +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]])
  5. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]])
  6. == 优化后的逻辑计划 ==
  7. Union(all=[true], union=[id, data])
  8. :- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
  9. : +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  10. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  11. == 物理执行计划 ==
  12. Stage 133 : Data Source
  13. content : Source: PythonInputFormatTableSource(id, data)
  14. Stage 134 : Operator
  15. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  16. ship_strategy : FORWARD
  17. Stage 135 : Operator
  18. content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
  19. ship_strategy : FORWARD
  20. Stage 136 : Data Source
  21. content : Source: PythonInputFormatTableSource(id, data)
  22. Stage 137 : Operator
  23. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  24. ship_strategy : FORWARD

以下代码展示了如何使用 StatementSet.explain() 方法:

  1. # 使用 StreamTableEnvironment
  2. from pyflink.table import EnvironmentSettings, StreamTableEnvironment
  3. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  4. table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  5. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  6. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  7. table_env.execute_sql("""
  8. CREATE TABLE print_sink_table (
  9. id BIGINT,
  10. data VARCHAR
  11. ) WITH (
  12. 'connector' = 'print'
  13. )
  14. """)
  15. table_env.execute_sql("""
  16. CREATE TABLE black_hole_sink_table (
  17. id BIGINT,
  18. data VARCHAR
  19. ) WITH (
  20. 'connector' = 'blackhole'
  21. )
  22. """)
  23. statement_set = table_env.create_statement_set()
  24. statement_set.add_insert("print_sink_table", table1.where(table1.data.like('H%')))
  25. statement_set.add_insert("black_hole_sink_table", table2)
  26. print(statement_set.explain())

结果为

  1. == 抽象语法树 ==
  2. LogicalSink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  3. +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
  4. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]])
  5. LogicalSink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  6. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]])
  7. == 优化后的逻辑计划 ==
  8. Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  9. +- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
  10. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  11. Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  12. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  13. == 物理执行计划 ==
  14. Stage 139 : Data Source
  15. content : Source: PythonInputFormatTableSource(id, data)
  16. Stage 140 : Operator
  17. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  18. ship_strategy : FORWARD
  19. Stage 141 : Operator
  20. content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
  21. ship_strategy : FORWARD
  22. Stage 143 : Data Source
  23. content : Source: PythonInputFormatTableSource(id, data)
  24. Stage 144 : Operator
  25. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  26. ship_strategy : FORWARD
  27. Stage 142 : Data Sink
  28. content : Sink: Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  29. ship_strategy : FORWARD
  30. Stage 145 : Data Sink
  31. content : Sink: Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  32. ship_strategy : FORWARD