Materialized Table Statements

Flink SQL supports the following Materialized Table statements for now:

CREATE MATERIALIZED TABLE

  1. CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
  2. [ ([ <table_constraint> ]) ]
  3. [COMMENT table_comment]
  4. [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  5. [WITH (key1=val1, key2=val2, ...)]
  6. FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
  7. [REFRESH_MODE = { CONTINUOUS | FULL }]
  8. AS <select_statement>
  9. <table_constraint>:
  10. [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

PRIMARY KEY

PRIMARY KEY defines an optional list of columns that uniquely identifies each row within the table. The column as the primary key must be non-null.

PARTITIONED BY

PARTITIONED BY defines an optional list of columns to partition the materialized table. A directory is created for each partition if this materialized table is used as a filesystem sink.

Example:

  1. -- Create a materialized table and specify the partition field as `ds`.
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. PARTITIONED BY (ds)
  4. FRESHNESS = INTERVAL '1' HOUR
  5. AS SELECT
  6. ds
  7. FROM
  8. ...

Note

  • The partition column must be included in the query statement of the materialized table.

WITH Options

WITH Options are used to specify the materialized table properties, including connector options and time format option for partition fields.

  1. -- Create a materialized table, specify the partition field as 'ds', and the corresponding time format as 'yyyy-MM-dd'
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. PARTITIONED BY (ds)
  4. WITH (
  5. 'format' = 'json',
  6. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  7. )
  8. ...

As shown in the above example, we specified the date-formatter option for the ds partition column. During each scheduling, the scheduling time will be converted to the ds partition value. For example, for a scheduling time of 2024-01-01 00:00:00, only the partition ds = '2024-01-01' will be refreshed.

Note

FRESHNESS

FRESHNESS defines the data freshness of a materialized table.

FRESHNESS and Refresh Mode Relationship

FRESHNESS defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. It does two things, firstly it determines the refresh mode of the materialized table through configuration, followed by determines the data refresh frequency to meet the actual data freshness requirements.

Explanation of FRESHNESS Parameter

The FRESHNESS parameter range is INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }. '<num>' must be a positive integer, and in FULL mode, '<num>' should be a common divisor of the respective time interval unit.

Examples: (Assuming materialized-table.refresh-mode.freshness-threshold is 30 minutes)

  1. -- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 1 second
  2. FRESHNESS = INTERVAL '1' SECOND
  3. -- The corresponding refresh pipeline is a real-time job with a checkpoint interval of 1 minute
  4. FRESHNESS = INTERVAL '1' MINUTE
  5. -- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour
  6. FRESHNESS = INTERVAL '1' HOUR
  7. -- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 day
  8. FRESHNESS = INTERVAL '1' DAY

Invalid FRESHNESS Examples:

  1. -- Interval is a negative number
  2. FRESHNESS = INTERVAL '-1' SECOND
  3. -- Interval is 0
  4. FRESHNESS = INTERVAL '0' SECOND
  5. -- Interval is in months or years
  6. FRESHNESS = INTERVAL '1' MONTH
  7. FRESHNESS = INTERVAL '1' YEAR
  8. -- In FULL mode, the interval is not a common divisor of the respective time range
  9. FRESHNESS = INTERVAL '60' SECOND
  10. FRESHNESS = INTERVAL '5' HOUR

Note

  • The materialized table data will be refreshed as closely as possible within the defined freshness but cannot guarantee complete satisfaction.
  • In CONTINUOUS mode, setting a data freshness interval that is too short can impact job performance as it aligns with the checkpoint interval. To optimize checkpoint performance, consider enabling-changelog.
  • In FULL mode, data freshness must be translated into a cron expression, consequently, only freshness intervals within predefined time spans are presently accommodated, this design ensures alignment with cron’s capabilities. Specifically, support for the following freshness:
    • Second: 30, 15, 10, 5, 2, and 1 second intervals.
    • Minute: 30, 15, 10, 5, 2, and 1 minute intervals.
    • Hour: 8, 4, 2, and 1 hour intervals.
    • Day: 1 day.

REFRESH_MODE

REFRESH_MODE is used to explicitly specify the refresh mode of the materialized table. The specified mode takes precedence over the framework’s automatic inference to meet specific scenarios’ needs.

Examples: (Assuming materialized-table.refresh-mode.freshness-threshold is 30 minutes)

  1. -- The refresh mode of the created materialized table is CONTINUOUS, and the job's checkpoint interval is 1 hour.
  2. CREATE MATERIALIZED TABLE my_materialized_table
  3. REFRESH_MODE = CONTINUOUS
  4. FRESHNESS = INTERVAL '1' HOUR
  5. AS SELECT
  6. ...
  7. -- The refresh mode of the created materialized table is FULL, and the job's schedule cycle is 10 minutes.
  8. CREATE MATERIALIZED TABLE my_materialized_table
  9. REFRESH_MODE = FULL
  10. FRESHNESS = INTERVAL '10' MINUTE
  11. AS SELECT
  12. ...

AS <select_statement>

This clause is used to define the query for populating materialized view data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL Queries.

Example:

  1. CREATE MATERIALIZED TABLE my_materialized_table
  2. FRESHNESS = INTERVAL '10' SECOND
  3. AS SELECT * FROM kafka_catalog.db1.kafka_table;

Examples

(Assuming materialized-table.refresh-mode.freshness-threshold is 30 minutes)

Create a materialized table with a data freshness of 10 seconds and the derived refresh mode is CONTINUOUS:

  1. CREATE MATERIALIZED TABLE my_materialized_table_continuous
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'debezium-json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '10' SECOND
  8. AS
  9. SELECT
  10. k.ds,
  11. k.user_id,
  12. COUNT(*) AS event_count,
  13. SUM(k.amount) AS total_amount,
  14. MAX(u.age) AS max_age
  15. FROM
  16. kafka_catalog.db1.kafka_table k
  17. JOIN
  18. user_catalog.db1.user_table u
  19. ON
  20. k.user_id = u.user_id
  21. WHERE
  22. k.event_type = 'purchase'
  23. GROUP BY
  24. k.ds, k.user_id

Create a materialized table with a data freshness of 1 hour and the derived refresh mode is FULL:

  1. CREATE MATERIALIZED TABLE my_materialized_table_full
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '1' HOUR
  8. AS
  9. SELECT
  10. p.ds,
  11. p.product_id,
  12. p.product_name,
  13. AVG(s.sale_price) AS avg_sale_price,
  14. SUM(s.quantity) AS total_quantity
  15. FROM
  16. paimon_catalog.db1.product_table p
  17. LEFT JOIN
  18. paimon_catalog.db1.sales_table s
  19. ON
  20. p.product_id = s.product_id
  21. WHERE
  22. p.category = 'electronics'
  23. GROUP BY
  24. p.ds, p.product_id, p.product_name

Limitations

  • Does not support explicitly specifying columns
  • Does not support modified query statements
  • Does not support using temporary tables, temporary views, or temporary functions in the select query

ALTER MATERIALIZED TABLE

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]

ALTER MATERIALIZED TABLE is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes.

SUSPEND

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

SUSPEND is used to pause the background refresh pipeline of the materialized table.

Example:

  1. -- Specify SAVEPOINT path before pausing
  2. SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path';
  3. -- Suspend the specified materialized table
  4. ALTER MATERIALIZED TABLE my_materialized_table SUSPEND;

Note

  • When suspending a table in CONTINUOUS mode, the job will be paused using STOP WITH SAVEPOINT by default. You need to set the SAVEPOINT save path using parameters.

RESUME

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)]

RESUME is used to resume the refresh pipeline of a materialized table. Materialized table dynamic options can be specified through WITH options clause, which only take effect on the current refreshed pipeline and are not persistent.

Example:

  1. -- Resume the specified materialized table
  2. ALTER MATERIALIZED TABLE my_materialized_table RESUME;
  3. -- Resume the specified materialized table and specify sink parallelism
  4. ALTER MATERIALIZED TABLE my_materialized_table RESUME WITH ('sink.parallelism'='10');

REFRESH

  1. ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH [PARTITION partition_spec]

REFRESH is used to proactively trigger the refresh of the materialized table.

Example:

  1. -- Refresh the entire table data
  2. ALTER MATERIALIZED TABLE my_materialized_table REFRESH;
  3. -- Refresh specified partition data
  4. ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28');

Note

  • The REFRESH operation will start a Flink batch job to refresh the materialized table data.

DROP MATERIALIZED TABLE

  1. DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name

When dropping a materialized table, the background refresh pipeline will be deleted first, and then the metadata corresponding to the materialized table will be removed from the Catalog.

Example:

  1. -- Delete the specified materialized table
  2. DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;