Understand Files

This article is specifically designed to clarify the impact that various file operations have on files.

This page provides concrete examples and practical tips for effectively managing them. Furthermore, through an in-depth exploration of operations such as commit and compact, we aim to offer insights into the creation and updates of files.

Prerequisite

Before delving further into this page, please ensure that you have read through the following sections:

  1. Basic Concepts,
  2. Primary Key Table and
  3. How to use Paimon in Flink.

Understand File Operations

Create Catalog

Start Flink SQL client via ./sql-client.sh and execute the following statements one by one to create a Paimon catalog.

  1. CREATE CATALOG paimon WITH (
  2. 'type' = 'paimon',
  3. 'warehouse' = 'file:///tmp/paimon'
  4. );
  5. USE CATALOG paimon;

This will only create a directory at given path file:///tmp/paimon.

Create Table

Execute the following create table statement will create a Paimon table with 3 fields:

  1. CREATE TABLE T (
  2. id BIGINT,
  3. a INT,
  4. b STRING,
  5. dt STRING COMMENT 'timestamp string in format yyyyMMdd',
  6. PRIMARY KEY(id, dt) NOT ENFORCED
  7. ) PARTITIONED BY (dt);

This will create Paimon table T under the path /tmp/paimon/default.db/T, with its schema stored in /tmp/paimon/default.db/T/schema/schema-0

Insert Records Into Table

Run the following insert statement in Flink SQL:

  1. INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');

Once the Flink job is completed, the records are written to the Paimon table through a successful commit. Users can verify the visibility of these records by executing the query SELECT * FROM T which will return a single row. The commit process creates a snapshot located at the path /tmp/paimon/default.db/T/snapshot/snapshot-1. The resulting file layout at snapshot-1 is as described below:

Understand Files - 图1

The content of snapshot-1 contains metadata of the snapshot, such as manifest list and schema id:

  1. {
  2. "version" : 3,
  3. "id" : 1,
  4. "schemaId" : 0,
  5. "baseManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0",
  6. "deltaManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1",
  7. "changelogManifestList" : null,
  8. "commitUser" : "7d758485-981d-4b1a-a0c6-d34c3eb254bf",
  9. "commitIdentifier" : 9223372036854775807,
  10. "commitKind" : "APPEND",
  11. "timeMillis" : 1684155393354,
  12. "logOffsets" : { },
  13. "totalRecordCount" : 1,
  14. "deltaRecordCount" : 1,
  15. "changelogRecordCount" : 0,
  16. "watermark" : -9223372036854775808
  17. }

Remind that a manifest list contains all changes of the snapshot, baseManifestList is the base file upon which the changes in deltaManifestList is applied. The first commit will result in 1 manifest file, and 2 manifest lists are created (the file names might differ from those in your experiment):

  1. ./T/manifest:
  2. manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1
  3. manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0
  4. manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 is the manifest file (manifest-1-0 in the above graph), which stores the information about the data files in the snapshot.

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 is the baseManifestList (manifest-list-1-base in the above graph), which is effectively empty.

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 is the deltaManifestList (manifest-list-1-delta in the above graph), which contains a list of manifest entries that perform operations on data files, which, in this case, is manifest-1-0.

Now let’s insert a batch of records across different partitions and see what happens. In Flink SQL, execute the following statement:

  1. INSERT INTO T VALUES
  2. (2, 10002, 'varchar00002', '20230502'),
  3. (3, 10003, 'varchar00003', '20230503'),
  4. (4, 10004, 'varchar00004', '20230504'),
  5. (5, 10005, 'varchar00005', '20230505'),
  6. (6, 10006, 'varchar00006', '20230506'),
  7. (7, 10007, 'varchar00007', '20230507'),
  8. (8, 10008, 'varchar00008', '20230508'),
  9. (9, 10009, 'varchar00009', '20230509'),
  10. (10, 10010, 'varchar00010', '20230510');

The second commit takes place and executing SELECT * FROM T will return 10 rows. A new snapshot, namely snapshot-2, is created and gives us the following physical file layout:

  1. % ls -1tR .
  2. ./T:
  3. dt=20230501
  4. dt=20230502
  5. dt=20230503
  6. dt=20230504
  7. dt=20230505
  8. dt=20230506
  9. dt=20230507
  10. dt=20230508
  11. dt=20230509
  12. dt=20230510
  13. snapshot
  14. schema
  15. manifest
  16. ./T/snapshot:
  17. LATEST
  18. snapshot-2
  19. EARLIEST
  20. snapshot-1
  21. ./T/manifest:
  22. manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2
  23. manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2
  24. manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2
  25. manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1
  26. manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1
  27. manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1
  28. ./T/dt=20230501/bucket-0:
  29. data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc
  30. ...
  31. # each partition has the data written to bucket-0
  32. ...
  33. ./T/schema:
  34. schema-0

The new file layout as of snapshot-2 looks like Understand Files - 图2

Delete Records From Table

Now let’s delete records that meet the condition dt>=20230503. In Flink SQL, execute the following statement:

Batch

  1. DELETE FROM T WHERE dt >= '20230503';

The third commit takes place and it gives us snapshot-3. Now, listing the files under the table and your will find out no partition is dropped. Instead, a new data file is created for partition 20230503 to 20230510:

  1. ./T/dt=20230510/bucket-0:
  2. data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement
  3. data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement

This make sense since we insert a record in the second commit (represented by +I[10, 10010, 'varchar00010', '20230510']) and then delete the record in the third commit. Executing SELECT * FROM T will return 2 rows, namely:

  1. +I[1, 10001, 'varchar00001', '20230501']
  2. +I[2, 10002, 'varchar00002', '20230502']

The new file layout as of snapshot-3 looks like Understand Files - 图3

Note that manifest-3-0 contains 8 manifest entries of ADD operation type, corresponding to 8 newly written data files.

Compact Table

As you may have noticed, the number of small files will augment over successive snapshots, which may lead to decreased read performance. Therefore, a full-compaction is needed in order to reduce the number of small files.

Let’s trigger the full-compaction now, and run a dedicated compaction job through flink run:

Batch

  1. <FLINK_HOME>/bin/flink run \
  2. -D execution.runtime-mode=batch \
  3. /path/to/paimon-flink-action-0.8.2.jar \
  4. compact \
  5. --warehouse <warehouse-path> \
  6. --database <database-name> \
  7. --table <table-name> \
  8. [--partition <partition-name>] \
  9. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  10. [--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]

an example would be (suppose you’re already in Flink home)

  1. ./bin/flink run \
  2. ./lib/paimon-flink-action-0.8.2.jar \
  3. compact \
  4. --path file:///tmp/paimon/default.db/T

All current table files will be compacted and a new snapshot, namely snapshot-4, is made and contains the following information:

  1. {
  2. "version" : 3,
  3. "id" : 4,
  4. "schemaId" : 0,
  5. "baseManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0",
  6. "deltaManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1",
  7. "changelogManifestList" : null,
  8. "commitUser" : "a3d951d5-aa0e-4071-a5d4-4c72a4233d48",
  9. "commitIdentifier" : 9223372036854775807,
  10. "commitKind" : "COMPACT",
  11. "timeMillis" : 1684163217960,
  12. "logOffsets" : { },
  13. "totalRecordCount" : 38,
  14. "deltaRecordCount" : 20,
  15. "changelogRecordCount" : 0,
  16. "watermark" : -9223372036854775808
  17. }

The new file layout as of snapshot-4 looks like Understand Files - 图4

Note that manifest-4-0 contains 20 manifest entries (18 DELETE operations and 2 ADD operations)

  1. For partition 20230503 to 20230510, two DELETE operations for two data files
  2. For partition 20230501 to 20230502, one DELETE operation and one ADD operation for the same data file.

Alter Table

Execute the following statement to configure full-compaction:

  1. ALTER TABLE T SET ('full-compaction.delta-commits' = '1');

It will create a new schema for Paimon table, namely schema-1, but no snapshot has actually used this schema yet until the next commit.

Expire Snapshots

Remind that the marked data files are not truly deleted until the snapshot expires and no consumer depends on the snapshot. For more information, see Expiring Snapshots.

During the process of snapshot expiration, the range of snapshots is initially determined, and then data files within these snapshots are marked for deletion. A data file is marked for deletion only when there is a manifest entry of kind DELETE that references that specific data file. This marking ensures that the file will not be utilized by subsequent snapshots and can be safely removed.

Let’s say all 4 snapshots in the above diagram are about to expire. The expire process is as follows:

  1. It first deletes all marked data files, and records any changed buckets.

  2. It then deletes any changelog files and associated manifests.

  3. Finally, it deletes the snapshots themselves and writes the earliest hint file.

If any directories are left empty after the deletion process, they will be deleted as well.

Let’s say another snapshot, snapshot-5 is created and snapshot expiration is triggered. snapshot-1 to snapshot-4 are
to be deleted. For simplicity, we will only focus on files from previous snapshots, the final layout after snapshot expiration looks like:

Understand Files - 图5

As a result, partition 20230503 to 20230510 are physically deleted.

Finally, we will examine Flink Stream Write by utilizing the example of CDC ingestion. This section will address the capturing and writing of change data into Paimon, as well as the mechanisms behind asynchronous compact and snapshot commit and expiration.

To begin, let’s take a closer look at the CDC ingestion workflow and the unique roles played by each component involved.

Understand Files - 图6

  1. MySQL CDC Source uniformly reads snapshot and incremental data, with SnapshotReader reading snapshot data and BinlogReader reading incremental data, respectively.
  2. Paimon Sink writes data into Paimon table in bucket level. The CompactManager within it will trigger compaction asynchronously.
  3. Committer Operator is a singleton responsible for committing and expiring snapshots.

Next, we will go over end-to-end data flow.

Understand Files - 图7

MySQL Cdc Source read snapshot and incremental data and emit them to downstream after normalization.

Understand Files - 图8

Paimon Sink first buffers new records in a heap-based LSM tree, and flushes them to disk when the memory buffer is full. Note that each data file written is a sorted run. At this point, no manifest file and snapshot is created. Right before Flink checkpoint takes places, Paimon Sink will flush all buffered records and send committable message to downstream, which is read and committed by Committer Operator during checkpoint.

Understand Files - 图9

During checkpoint, Committer Operator will create a new snapshot and associate it with manifest lists so that the snapshot
contains information about all data files in the table.

Understand Files - 图10

At later point asynchronous compaction might take place, and the committable produced by CompactManager contains information about previous files and merged files so that Committer Operator can construct corresponding manifest entries. In this case Committer Operator might produce two snapshot during Flink checkpoint, one for data written (snapshot of kind Append) and the other for compact (snapshot of kind Compact). If no data file is written during checkpoint interval, only snapshot of kind Compact will be created. Committer Operator will check against snapshot expiration and perform physical deletion of marked data files.

Understand Small Files

Many users are concerned about small files, which can lead to:

  1. Stability issue: Too many small files in HDFS, NameNode will be overstressed.
  2. Cost issue: A small file in HDFS will temporarily use the size of a minimum of one Block, for example 128 MB.
  3. Query efficiency: The efficiency of querying too many small files will be affected.

Understand Checkpoints

Assuming you are using Flink Writer, each checkpoint generates 1-2 snapshots, and the checkpoint forces the files to be generated on DFS, so the smaller the checkpoint interval the more small files will be generated.

  1. So first thing is increase checkpoint interval.

By default, not only checkpoint will cause the file to be generated, but writer’s memory (write-buffer-size) exhaustion will also flush data to DFS and generate the corresponding file. You can enable write-buffer-spillable to generate spilled files in writer to generate bigger files in DFS.

  1. So second thing is increase write-buffer-size or enable write-buffer-spillable.

Understand Snapshots

Understand Files - 图11

Paimon maintains multiple versions of files, compaction and deletion of files are logical and do not actually delete files. Files are only really deleted when Snapshot is expired, so the first way to reduce files is to reduce the time it takes for snapshot to be expired. Flink writer will automatically expire snapshots.

See Expire Snapshots.

Understand Partitions and Buckets

Paimon files are organized in a layered style. The following image illustrates the file layout. Starting from a snapshot file, Paimon readers can recursively access all records from the table.

Understand Files - 图12

For example, the following table:

  1. CREATE TABLE MyTable (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. behavior STRING,
  5. dt STRING,
  6. hh STRING,
  7. PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
  8. ) PARTITIONED BY (dt, hh) WITH (
  9. 'bucket' = '10'
  10. );

The table data will be physically sliced into different partitions, and different buckets inside, so if the overall data volume is too small, there is at least one file in a single bucket, I suggest you configure a smaller number of buckets, otherwise there will be quite a few small files as well.

Understand LSM for Primary Table

LSM tree organizes files into several sorted runs. A sorted run consists of one or multiple data files and each data file belongs to exactly one sorted run.

Understand Files - 图13

By default, sorted runs number depends on num-sorted-run.compaction-trigger, see Compaction for Primary Key Table, this means that there are at least 5 files in a bucket. If you want to reduce this number, you can keep fewer files, but write performance may suffer.

Understand Files for Append Queue Table

By default, Append also does automatic compaction to reduce the number of small files.

However, for Bucket’s Append table, it will only compact the files within the Bucket for sequential purposes, which may keep more small files. See Append Queue.

Understand Full-Compaction

Maybe you think the 5 files for the primary key table are actually okay, but the Append table (bucket) may have 50 small files in a single bucket, which is very difficult to accept. Worse still, partitions that are no longer active also keep so many small files.

Configure ‘full-compaction.delta-commits’ perform full-compaction periodically in Flink writing. And it can ensure that partitions are full compacted before writing ends.