Quickstart Guide

This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment and creating, altering, and dropping materialized tables in CONTINUOUS and FULL mode.

Architecture Introduction

  • Client: Could be any client that can interact with Flink SQL Gateway, such as SQL Client, Flink JDBC Driver and so on.
  • Flink SQL Gateway: Supports creating, altering, and dropping materialized tables. It also serves as an embedded workflow scheduler to periodically refresh full mode materialized tables.
  • Flink Cluster: The pipeline for refreshing materialized tables will run on the Flink cluster.
  • Catalog: Manages the creation, retrieval, modification, and deletion of the metadata of materialized tables.
  • Catalog Store: Supports catalog property persistence to automatically initialize catalogs for retrieving metadata in materialized table related operations.

Illustration of Flink Materialized Table Architecture

Environment Setup

Directory Preparation

Replace the example paths below with real paths on your machine.

  • Create directories for Catalog Store and test-filesystem Catalog:
  1. # Directory for File Catalog Store to save catalog information
  2. mkdir -p {catalog_store_path}
  3. # Directory for test-filesystem Catalog to save table metadata and table data
  4. mkdir -p {catalog_path}
  5. # Directory for the default database of test-filesystem Catalog
  6. mkdir -p {catalog_path}/mydb
  • Create directories for Checkpoints and Savepoints:
  1. mkdir -p {checkpoints_path}
  2. mkdir -p {savepoints_path}

Resource Preparation

The method here is similar to the steps recorded in local installation. Flink can run on any UNIX-like operating system, such as Linux, Mac OS X, and Cygwin (for Windows).

Download the latest Flink binary package and extract it:

  1. tar -xzf flink-*.tgz

Download the test-filesystem connector and place it in the lib directory:

  1. cp flink-table-filesystem-test-utils-{VERSION}.jar flink-*/lib/

Configuration Preparation

Edit the config.yaml file and add the following configurations:

  1. execution:
  2. checkpoints:
  3. dir: file://{checkpoints_path}
  4. # Configure file catalog store
  5. table:
  6. catalog-store:
  7. kind: file
  8. file:
  9. path: {catalog_store_path}
  10. # Configure embedded scheduler
  11. workflow-scheduler:
  12. type: embedded
  13. # Configure SQL gateway address and port
  14. sql-gateway:
  15. endpoint:
  16. rest:
  17. address: 127.0.0.1
  18. port: 8083

Run the following script to start the cluster locally:

  1. ./bin/start-cluster.sh

Start SQL Gateway

Run the following script to start the SQL Gateway locally:

  1. ./bin/sql-gateway.sh start

Start SQL Client

Run the following script to start the SQL Client locally and connect to the SQL Gateway:

  1. ./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083

Create Catalog and Source Table

  • Create the test-filesystem catalog:
  1. CREATE CATALOG mt_cat WITH (
  2. 'type' = 'test-filesystem',
  3. 'path' = '{catalog_path}',
  4. 'default-database' = 'mydb'
  5. );
  6. USE CATALOG mt_cat;
  • Create the Source table:
  1. -- 1. Create Source table and specify the data format is json
  2. CREATE TABLE json_source (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. user_name STRING,
  6. order_created_at STRING,
  7. payment_amount_cents BIGINT
  8. ) WITH (
  9. 'format' = 'json',
  10. 'source.monitor-interval' = '10s'
  11. );
  12. -- 2. Insert some test data
  13. INSERT INTO json_source VALUES
  14. (1001, 1, 'user1', '2024-06-19', 10),
  15. (1002, 2, 'user2', '2024-06-19', 20),
  16. (1003, 3, 'user3', '2024-06-19', 30),
  17. (1004, 4, 'user4', '2024-06-19', 40),
  18. (1005, 1, 'user1', '2024-06-20', 10),
  19. (1006, 2, 'user2', '2024-06-20', 20),
  20. (1007, 3, 'user3', '2024-06-20', 30),
  21. (1008, 4, 'user4', '2024-06-20', 40);

Create Continuous Mode Materialized Table

Create Materialized Table

Create a materialized table in CONTINUOUS mode with a data freshness of 30 seconds. You can find the Flink streaming job for continuous refresh the materialized table is running on the page http://localhost:8081. And it’s checkpoint interval is 30 seconds.

  1. CREATE MATERIALIZED TABLE continuous_users_shops
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'debezium-json',
  5. 'sink.rolling-policy.rollover-interval' = '10s',
  6. 'sink.rolling-policy.check-interval' = '10s'
  7. )
  8. FRESHNESS = INTERVAL '30' SECOND
  9. AS SELECT
  10. user_id,
  11. ds,
  12. SUM (payment_amount_cents) AS payed_buy_fee_sum,
  13. SUM (1) AS PV
  14. FROM (
  15. SELECT user_id, order_created_at AS ds, payment_amount_cents
  16. FROM json_source
  17. ) AS tmp
  18. GROUP BY user_id, ds;

Suspend Materialized Table

Suspend the refresh pipeline of the materialized table. Your will find that the Flink streaming job for continuous refresh the materialized table transitions to FINISHED state on http://localhost:8081. Before executing the suspend operation, you need to set the savepoint path.

  1. -- Set savepoint path before suspending
  2. SET 'execution.checkpointing.savepoint-dir' = 'file://{savepoints_path}';
  3. ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;

Query Materialized Table

Query the materialized table data and confirm that data has already been written.

  1. SELECT * FROM continuous_users_shops;

Resume Materialized Table

Resume the refresh pipeline of the materialized table. You will find that a new Flink streaming job for continuous refresh the materialized table is started and restored state from the specified savepoint path on http://localhost:8081 page.

  1. ALTER MATERIALIZED TABLE continuous_users_shops RESUME;

Drop Materialized Table

Drop the materialized table, and you will find that the Flink streaming job for continuous refresh the materialized table transitions to the CANCELED state on http://localhost:8081 page.

  1. DROP MATERIALIZED TABLE continuous_users_shops;

Create Full Mode Materialized Table

Create Materialized Table

Create a materialized table in FULL mode with a data freshness of 1 minute. (Here we set freshness to 1 minute just for convenience of testing) You will find that the Flink Batch job for periodic refreshing the materialized table is scheduled every 1 minute on the http://localhost:8081.

  1. CREATE MATERIALIZED TABLE full_users_shops
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '1' MINUTE
  8. REFRESH_MODE = FULL
  9. AS SELECT
  10. user_id,
  11. ds,
  12. SUM (payment_amount_cents) AS payed_buy_fee_sum,
  13. SUM (1) AS PV
  14. FROM (
  15. SELECT user_id, order_created_at AS ds, payment_amount_cents
  16. FROM json_source
  17. ) AS tmp
  18. GROUP BY user_id, ds;

Query Materialized Table

Insert some data into today’s partition. Wait at least 1 minute and query the materialized table results to find that only today’s partition data is refreshed.

  1. INSERT INTO json_source VALUES
  2. (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
  3. (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
  4. (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
  5. (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
  1. SELECT * FROM full_users_shops;

Manually Refresh Historical Partition

Manually refresh the partition ds='2024-06-20' and verify the data in the materialized table. You can find the Flink batch job for the current refresh operation on the http://localhost:8081 page.

  1. -- Manually refresh historical partition
  2. ALTER MATERIALIZED TABLE full_users_shops REFRESH PARTITION(ds='2024-06-20');
  3. -- Query materialized table data
  4. SELECT * FROM full_users_shops;

Suspend and Resume Materialized Table

By suspending and resuming operations, you can control the refresh jobs corresponding to the materialized table. After suspending, the Flink batch job for periodic refreshing the materialized table will not be scheduled. After resuming, the Flink batch job for periodic refreshing the materialized table will be rescheduled again. You can find the Flink job scheduling status on the http://localhost:8081 page.

  1. -- Suspend background refresh pipeline
  2. ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
  3. -- Resume background refresh pipeline
  4. ALTER MATERIALIZED TABLE full_users_shops RESUME;

Drop Materialized Table

After dropping the materialized table, the Flink batch job for periodic refreshing the materialized table will not be scheduled again. You can confirm this on the http://localhost:8081 page.

  1. DROP MATERIALIZED TABLE full_users_shops;