物化表语法

Flink SQL 目前支持以下物化表操作:

CREATE MATERIALIZED TABLE

  1. CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
  2. [ ([ <table_constraint> ]) ]
  3. [COMMENT table_comment]
  4. [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  5. [WITH (key1=val1, key2=val2, ...)]
  6. FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
  7. [REFRESH_MODE = { CONTINUOUS | FULL }]
  8. AS <select_statement>
  9. <table_constraint>:
  10. [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

PRIMARY KEY

PRIMARY KEY 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。

PARTITIONED BY

PARTITIONED BY 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 filesystem connector ,每个分区将创建一个目录。

示例:

  1. -- 创建一个物化表并将分区字段设置为 `ds`
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. PARTITIONED BY (ds)
  4. FRESHNESS = INTERVAL '1' HOUR
  5. AS SELECT
  6. ds
  7. FROM
  8. ...

注意

  • 分区字段必须是物化表查询语句中的字段。

WITH Options

WITH Options 可以定义创建物化表所需的属性,包括连接器参数和分区字段的时间格式参数

  1. -- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd'
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. PARTITIONED BY (ds)
  4. WITH (
  5. 'format' = 'json',
  6. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  7. )
  8. ...

如上例所示,我们为 ds 分区列指定了 date-formatter 选项。每次调度时,调度时间将转换为相应的 ds 分区值。例如,在 2024-01-01 00:00:00 的调度时间下,只有分区 ds = '2024-01-01' 会被刷新。

注意

FRESHNESS

FRESHNESS 用于指定物化表的数据新鲜度。

数据新鲜度与刷新模式关系

数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过配置确定物化表的刷新模式,然后确定数据刷新频率以满足实际数据新鲜度要求。

FRESHNESS 参数详解

FRESHNESS 参数的取值范围为 INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }。'<num>' 必须为正整数,并且在全量模式下,'<num>' 应该是相应时间间隔单位的公约数。

示例: (假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

  1. -- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业
  2. FRESHNESS = INTERVAL '1' SECOND
  3. -- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业
  4. FRESHNESS = INTERVAL '1' MINUTE
  5. -- 对应的刷新管道是一个调度周期为 1 小时的调度工作流
  6. FRESHNESS = INTERVAL '1' HOUR
  7. -- 对应的刷新管道是一个调度周期为 1 天的调度工作流
  8. FRESHNESS = INTERVAL '1' DAY

不合法的 FRESHNESS 示例:

  1. -- 间隔为负数
  2. FRESHNESS = INTERVAL '-1' SECOND
  3. -- 间隔为0
  4. FRESHNESS = INTERVAL '0' SECOND
  5. -- 间隔为月或者年
  6. FRESHNESS = INTERVAL '1' MONTH
  7. FRESHNESS = INTERVAL '1' YEAR
  8. -- 全量模式下,间隔不是对应时间间隔单位的公约数
  9. FRESHNESS = INTERVAL '60' SECOND
  10. FRESHNESS = INTERVAL '5' HOUR

注意

  • 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
  • 在连续模式下,数据新鲜度和 checkpoint 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 checkpoint 性能,建议开启 Changelog
  • 在全量模式下,数据新鲜度会转换为 cron 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 cron 表达式语义的一致性。具体支持以下新鲜度间隔:
    • 秒:30、15、10、5、2 和 1 秒间隔。
    • 分钟:30、15、10、5、2 和 1 分钟间隔。
    • 小时:8、4、2 和 1 小时间隔。
    • 天:1 天。

REFRESH_MODE

REFRESH_MODE 用于显式指定物化表的刷新模式。指定的刷新模式比框架自动推导的模式具有更高的优先级,以满足特定场景的需求。

示例: (假定 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

  1. -- 创建的物化表的刷新模式为连续模式,作业的 checkpoint 间隔为 1 小时。
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. REFRESH_MODE = CONTINUOUS
  4. FRESHNESS = INTERVAL '1' HOUR
  5. AS SELECT
  6. ...
  7. -- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
  8. CREATE MATERIALIZED TABLE my_materialized_table
  9. REFRESH_MODE = FULL
  10. FRESHNESS = INTERVAL '10' MINUTE
  11. AS SELECT
  12. ...

AS <select_statement>

该子句用于定义填充物化表数据的查询。上游表可以是物化表、表或视图。select 语句支持所有 Flink SQL 查询

示例:

  1. CREATE MATERIALIZED TABLE my_materialized_table
  2. FRESHNESS = INTERVAL '10' SECOND
  3. AS SELECT * FROM kafka_catalog.db1.kafka_table;

示例

(假设 materialized-table.refresh-mode.freshness-threshold 为 30 分钟)

创建一个数据新鲜度为 10 秒的物化表,推导出的刷新模式为连续模式:

  1. CREATE MATERIALIZED TABLE my_materialized_table_continuous
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '10' SECOND
  8. REFRESH_MODE = 'CONTINUOUS'
  9. AS
  10. SELECT
  11. k.ds,
  12. k.user_id,
  13. COUNT(*) AS event_count,
  14. SUM(k.amount) AS total_amount,
  15. MAX(u.age) AS max_age
  16. FROM
  17. kafka_catalog.db1.kafka_table k
  18. JOIN
  19. user_catalog.db1.user_table u
  20. ON
  21. k.user_id = u.user_id
  22. WHERE
  23. k.event_type = 'purchase'
  24. GROUP BY
  25. k.ds, k.user_id

创建一个数据新鲜度为 1 小时的物化表,推导出的刷新模式为全量模式:

  1. CREATE MATERIALIZED TABLE my_materialized_table_full
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '10' MINUTE
  8. REFRESH_MODE = 'FULL'
  9. AS
  10. SELECT
  11. p.ds,
  12. p.product_id,
  13. p.product_name,
  14. AVG(s.sale_price) AS avg_sale_price,
  15. SUM(s.quantity) AS total_quantity
  16. FROM
  17. paimon_catalog.db1.product_table p
  18. LEFT JOIN
  19. paimon_catalog.db1.sales_table s
  20. ON
  21. p.product_id = s.product_id
  22. WHERE
  23. p.category = 'electronics'
  24. GROUP BY
  25. p.ds, p.product_id, p.product_name

限制

  • 不支持显式指定列
  • 不支持修改查询语句
  • 不支持在 select 查询中使用临时表、临时视图或临时函数

ALTER MATERIALIZED TABLE

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]

ALTER MATERIALIZED TABLE 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。

SUSPEND

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

SUSPEND 用于暂停物化表的后台刷新管道。

示例:

  1. -- 暂停前指定 SAVEPOINT 路径
  2. SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path';
  3. -- 暂停指定的物化表
  4. ALTER MATERIALIZED TABLE my_materialized_table SUSPEND;

注意

  • 暂停连续模式的表时,默认会使用 STOP WITH SAVEPOINT 暂停作业,你需要使用参数设置 SAVEPOINT 保存路径。

RESUME

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)]

RESUME 用于恢复物化表的刷新管道。在恢复时,可以通过 WITH 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。

示例:

  1. -- 恢复指定的物化表
  2. ALTER MATERIALIZED TABLE my_materialized_table RESUME;
  3. -- 恢复指定的物化表并指定 sink 的并行度
  4. ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');

REFRESH

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH [PARTITION partition_spec]

REFRESH 用于主动触发物化表的刷新。

示例:

  1. -- 刷新全表数据
  2. ALTER MATERIALIZED TABLE my_materialized_table REFRESH;
  3. -- 刷新指定分区数据
  4. ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28');

注意

  • REFRESH 操作会启动批作业来刷新表的数据。

DROP MATERIALIZED TABLE

  1. DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name

删除物化表时,首先删除后台刷新管道,然后从 Catalog 中移除该物化表的元数据。

示例:

  1. -- 删除指定的物化表
  2. DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;