快速入门

本入门指南将帮助你快速了解并开始使用物化表。内容包括环境设置,以及创建、修改和删除连续模式和全量模式的物化表。

架构介绍

  • Client: 可以是任何能够与 Flink SQL Gateway 交互的客户端,如 SQL 客户端Flink JDBC 驱动 等。
  • Flink SQL Gateway: 支持创建、修改和删除物化表。并包含了一个内置的工作流调度器,用于定期刷新全量模式的物化表。
  • Flink Cluster: 用于运行物化表刷新作业的 Flink 集群。
  • Catalog: 负责管理物化表元数据的创建、查询、修改和删除。
  • Catalog Store: 提供 Catalog 属性持久化功能,以便在操作物化表时自动初始化 Catalog 并获取相关的元数据。

Illustration of Flink Materialized Table Architecture

环境搭建

目录准备

请将下面的示例路径替换为你机器上的实际路径。

  • 创建 Catalog Store 和 Catalog 所需的目录
  1. # 用于 File Catalog Store 保存 Catalog 属性
  2. mkdir -p {catalog_store_path}
  3. # 用于 test-filesystem Catalog 保存元数据和表数据
  4. mkdir -p {catalog_path}
  5. # 用于 test-filesystem Catalog 的默认数据库
  6. mkdir -p {catalog_path}/mydb
  • 创建目录分别用于保存 Checkpoints 和 Savepoints:
  1. mkdir -p {checkpoints_path}
  2. mkdir -p {savepoints_path}

资源准备

这里的方法和本地安装中记录的步骤类似。Flink 可以运行在任何类 UNIX 的操作系统下面,例如:Linux, Mac OS X 和 Cygwin (for Windows)。

下载 Flink 最新的二进制包并进行解压:

  1. tar -xzf flink-*.tgz

下载 test-filesystem 连接器, 并将其放入 lib 目录:

  1. cp flink-table-filesystem-test-utils-{VERSION}.jar flink-*/lib/

配置准备

config.yaml 文件中添加以下配置:

  1. execution:
  2. checkpoints:
  3. dir: file://{checkpoints_path}
  4. # 配置 file catalog store
  5. table:
  6. catalog-store:
  7. kind: file
  8. file:
  9. path: {catalog_store_path}
  10. # 配置 embedded 调度器
  11. workflow-scheduler:
  12. type: embedded
  13. # 配置 SQL gateway 的地址和端口
  14. sql-gateway:
  15. endpoint:
  16. rest:
  17. address: 127.0.0.1
  18. port: 8083

运行以下脚本,即可在本地启动集群:

  1. ./bin/start-cluster.sh

启动 SQL Gateway

运行以下脚本,即可在本地启动 SQL Gateway:

  1. ./sql-gateway.sh start

启动 SQL Client

运行以下脚本,即可在本地启动 SQL Client 客户端并连接到指定的 SQL Gateway:

  1. ./sql-client.sh gateway --endpoint http://127.0.0.1:8083

创建 Catalog 和 Source 表

  • 创建 test-filesystem catalog 用于后续创建物化表。
  1. CREATE CATALOG mt_cat WITH (
  2. 'type' = 'test-filesystem',
  3. 'path' = '{catalog_path}',
  4. 'default-database' = 'mydb'
  5. );
  6. USE CATALOG mt_cat;
  • 创建 Source 表作为物化表的数据源。
  1. -- 1. 创建 Source 表,并指定 Source 表的数据格式为 json
  2. CREATE TABLE json_source (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. user_name STRING,
  6. order_created_at STRING,
  7. payment_amount_cents BIGINT
  8. ) WITH (
  9. 'format' = 'json',
  10. 'source.monitor-interval' = '10s'
  11. );
  12. -- 2. 插入一些测试数据
  13. INSERT INTO json_source VALUES
  14. (1001, 1, 'user1', '2024-06-19', 10),
  15. (1002, 2, 'user2', '2024-06-19', 20),
  16. (1003, 3, 'user3', '2024-06-19', 30),
  17. (1004, 4, 'user4', '2024-06-19', 40),
  18. (1005, 1, 'user1', '2024-06-20', 10),
  19. (1006, 2, 'user2', '2024-06-20', 20),
  20. (1007, 3, 'user3', '2024-06-20', 30),
  21. (1008, 4, 'user4', '2024-06-20', 40);

创建连续模式物化表

创建物化表

创建一个连续模式的物化表,对应的数据新鲜度为 30 秒。通过 http://localhost:8081 页面可以查看对应的 Flink 流作业,该作业处于 RUNNING 状态,对应的 checkpoint 间隔为 30 秒。

  1. CREATE MATERIALIZED TABLE continuous_users_shops
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'debezium-json',
  5. 'sink.rolling-policy.rollover-interval' = '10s',
  6. 'sink.rolling-policy.check-interval' = '10s'
  7. )
  8. FRESHNESS = INTERVAL '30' SECOND
  9. AS SELECT
  10. user_id,
  11. ds,
  12. SUM (payment_amount_cents) AS payed_buy_fee_sum,
  13. SUM (1) AS PV
  14. FROM (
  15. SELECT user_id, order_created_at AS ds, payment_amount_cents
  16. FROM json_source
  17. ) AS tmp
  18. GROUP BY user_id, ds;

暂停物化表

暂停物化表的刷新管道。在 http://localhost:8081 页面上,你会看到负责持续刷新物化表的 Flink 流作业已处于 FINISHED 状态。在执行暂停操作前,请确保设置 savepoint 路径。

  1. -- 暂停前设置 savepoint 路径
  2. SET 'execution.checkpointing.savepoint-dir' = 'file://{savepoint_path}';
  3. ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;

查询物化表

查询物化表数据并确认数据已成功写入。

  1. SELECT * FROM continuous_users_shops;

恢复物化表

恢复物化表的刷新管道,你会发现一个新的 Flink 流作业从指定的 savepoint 路径启动,用于持续刷新物化表,可以在 http://localhost:8081 页面上查看。

  1. ALTER MATERIALIZED TABLE continuous_users_shops RESUME;

删除物化表

删除物化表后,你会发现用于持续刷新物化表的 Flink 流作业在 http://localhost:8081 页面上转变为 CANCELED 状态。

  1. DROP MATERIALIZED TABLE continuous_users_shops;

创建全量模式物化表

创建物化表

创建一个全量模式的物化表,对应的数据新鲜度为 1 分钟。(此处设置为 1 分钟只是为了方便测试)你会发现用于定期刷新物化表的 Flink 批作业每 1 分钟调度一次,可以在 http://localhost:8081 页面上查看。

  1. CREATE MATERIALIZED TABLE full_users_shops
  2. PARTITIONED BY (ds)
  3. WITH (
  4. 'format' = 'json',
  5. 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
  6. )
  7. FRESHNESS = INTERVAL '1' MINUTE
  8. REFRESH_MODE = FULL
  9. AS SELECT
  10. user_id,
  11. ds,
  12. SUM (payment_amount_cents) AS payed_buy_fee_sum,
  13. SUM (1) AS PV
  14. FROM (
  15. SELECT user_id, order_created_at AS ds, payment_amount_cents
  16. FROM json_source
  17. ) AS tmp
  18. GROUP BY user_id, ds;

查询物化表

向当天的分区插入一些数据,等待至少 1 分钟,然后查询物化表结果,只能查询到当天分区的数据。

  1. INSERT INTO json_source VALUES
  2. (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
  3. (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
  4. (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
  5. (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
  1. SELECT * FROM full_users_shops;

手动刷新历史分区

手动刷新分区 ds='2024-06-20',并验证物化表中该日期的数据。你可以在 http://localhost:8081 页面上找到当前正在运行的 Flink 批作业。

  1. -- 手动刷新历史分区
  2. ALTER MATERIALIZED TABLE full_users_shops REFRESH PARTITION(ds='2024-06-20');
  3. -- 查询物化表数据
  4. SELECT * FROM full_users_shops;

暂停和恢复物化表

通过暂停和恢复操作,你可以控制物化表的刷新作业。一旦暂停,负责定期刷新物化表的 Flink 批作业将不再被调度执行。当恢复时,Flink 批作业将重新开始调度以刷新物化表。你可在 http://localhost:8081 页面上查看 Flink 作业的调度状态。

  1. -- 暂停后台刷新任务
  2. ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
  3. -- 恢复后台刷新任务
  4. ALTER MATERIALIZED TABLE full_users_shops RESUME;

删除物化表

删除物化表后,负责定期刷新该物化表的 Flink 批作业将不再被调度执行。你可以在 http://localhost:8081 页面上进行确认。

  1. DROP MATERIALIZED TABLE full_users_shops;