Dedicated Compaction

Paimon’s snapshot management supports writing with multiple writers.

For S3-like object store, its 'RENAME' does not have atomic semantic. We need to configure Hive metastore and enable 'lock.enabled' option for the catalog.

By default, Paimon supports concurrent writing to different partitions. A recommended mode is that streaming job writes records to Paimon’s latest partition; Simultaneously batch job (overwrite) writes records to the historical partition.

Dedicated Compaction - 图1

So far, everything works very well, but if you need multiple writers to write records to the same partition, it will be a bit more complicated. For example, you don’t want to use UNION ALL, you have multiple streaming jobs to write records to a 'partial-update' table. Please refer to the 'Dedicated Compaction Job' below.

Dedicated Compaction Job

By default, Paimon writers will perform compaction as needed during writing records. This is sufficient for most use cases.

Compaction will mark some data files as “deleted” (not really deleted, see expiring snapshots for more info). If multiple writers mark the same file, a conflict will occur when committing the changes. Paimon will automatically resolve the conflict, but this may result in job restarts.

To avoid these downsides, users can also choose to skip compactions in writers, and run a dedicated job only for compaction. As compactions are performed only by the dedicated job, writers can continuously write records without pausing and no conflicts will ever occur.

To skip compactions in writers, set the following table property to true.

OptionRequiredDefaultTypeDescription
write-only
NofalseBooleanIf set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.

To run a dedicated job for compaction, follow these instructions.

Flink Action Jar

Run the following command to submit a compaction job for the table.

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. [--partition <partition-name>] \
  8. [--table_conf <table_conf>] \
  9. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Example: compact table

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact \
  4. --warehouse s3:///path/to/warehouse \
  5. --database test_db \
  6. --table test_table \
  7. --partition dt=20221126,hh=08 \
  8. --partition dt=20221127,hh=09 \
  9. --table_conf sink.parallelism=10 \
  10. --catalog_conf s3.endpoint=https://****.com \
  11. --catalog_conf s3.access-key=***** \
  12. --catalog_conf s3.secret-key=*****

You can use -D execution.runtime-mode=batch or -yD execution.runtime-mode=batch (for the ON-YARN scenario) to control batch or streaming mode. If you submit a batch job, all current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes to the table and perform compactions as needed.

For more usage of the compact action, see

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact --help

Flink SQL

Run the following sql:

  1. -- compact table
  2. CALL sys.compact(`table` => 'default.T');
  3. -- compact table with options
  4. CALL sys.compact(`table` => 'default.T', `options` => 'sink.parallelism=4');
  5. -- compact table partition
  6. CALL sys.compact(`table` => 'default.T', `partitions` => 'p=0');
  7. -- compact table partition with filter
  8. CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20');

Similarly, the default is synchronous compaction, which may cause checkpoint timeouts. You can configure table_conf to use Asynchronous Compaction.

Database Compaction Job

You can run the following command to submit a compaction job for multiple database.

Flink Action Jar

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact_database \
  4. --warehouse <warehouse-path> \
  5. --including_databases <database-name|name-regular-expr> \
  6. [--including_tables <paimon-table-name|name-regular-expr>] \
  7. [--excluding_tables <paimon-table-name|name-regular-expr>] \
  8. [--mode <compact-mode>] \
  9. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  10. [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
  • --including_databases is used to specify which database is to be compacted. In compact mode, you need to specify a database name, in compact_database mode, you could specify multiple database, regular expression is supported.
  • --including_tables is used to specify which source tables are to be compacted, you must use ‘|’ to separate multiple tables, the format is databaseName.tableName, regular expression is supported. For example, specifying “–including_tables db1.t1|db2.+” means to compact table ‘db1.t1’ and all tables in the db2 database.
  • --excluding_tables is used to specify which source tables are not to be compacted. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both.
  • --mode is used to specify compaction mode. Possible values:
    • “divided” (the default mode if you haven’t specified one): start a sink for each table, the compaction of the new table requires restarting the job.
    • “combined”: start a single combined sink for all tables, the new table will be automatically compacted.
  • --catalog_conf is the configuration for Paimon catalog. Each configuration should be specified in the format key=value. See here for a complete list of catalog configurations.
  • --table_conf is the configuration for compaction. Each configuration should be specified in the format key=value. Pivotal configuration is listed below:
KeyDefaultTypeDescription
continuous.discovery-interval10 sDurationThe discovery interval of continuous reading.
sink.parallelism(none)IntegerDefines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.

You can use -D execution.runtime-mode=batch to control batch or streaming mode. If you submit a batch job, all current table files will be compacted. If you submit a streaming job, the job will continuously monitor new changes to the table and perform compactions as needed.

If you only want to submit the compaction job and don’t want to wait until the job is done, you should submit in detached mode.

You can set --mode combined to enable compacting newly added tables without restarting job.

Example1: compact database

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact_database \
  4. --warehouse s3:///path/to/warehouse \
  5. --including_databases test_db \
  6. --catalog_conf s3.endpoint=https://****.com \
  7. --catalog_conf s3.access-key=***** \
  8. --catalog_conf s3.secret-key=*****

Example2: compact database in combined mode

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact_database \
  4. --warehouse s3:///path/to/warehouse \
  5. --including_databases test_db \
  6. --mode combined \
  7. --catalog_conf s3.endpoint=https://****.com \
  8. --catalog_conf s3.access-key=***** \
  9. --catalog_conf s3.secret-key=***** \
  10. --table_conf continuous.discovery-interval=*****

For more usage of the compact_database action, see

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact_database --help

Flink SQL

Run the following sql:

  1. CALL sys.compact_database('includingDatabases')
  2. CALL sys.compact_database('includingDatabases', 'mode')
  3. CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
  4. CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables')
  5. CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions')
  6. -- example
  7. CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore', 'sink.parallelism=4')

Sort Compact

If your table is configured with dynamic bucket primary key table or append table , you can trigger a compact with specified column sort to speed up queries.

Flink Action Jar

  1. <FLINK_HOME>/bin/flink run \
  2. -D execution.runtime-mode=batch \
  3. /path/to/paimon-flink-action-0.9.0.jar \
  4. compact \
  5. --warehouse <warehouse-path> \
  6. --database <database-name> \
  7. --table <table-name> \
  8. --order_strategy <orderType> \
  9. --order_by <col1,col2,...> \
  10. [--partition <partition-name>] \
  11. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  12. [--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]

There are two new configuration in Sort Compact

ConfigurationDescription
—order_strategy
the order strategy now support “zorder” and “hilbert” and “order”. For example: —order_strategy zorder
—order_by
Specify the order columns. For example: —order_by col0, col1

The sort parallelism is the same as the sink parallelism, you can dynamically specify it by add conf --table_conf sink.parallelism=<value>.

Flink SQL

Run the following sql:

  1. -- sort compact table
  2. CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'a,b')

Historical Partition Compact

You can run the following command to submit a compaction job for partition which has not received any new data for a period of time. Small files in those partitions will be full compacted.

This feature now is only used in batch mode.

For Table

This is for one table.

Flink Action Jar

  1. <FLINK_HOME>/bin/flink run \
  2. -D execution.runtime-mode=batch \
  3. /path/to/paimon-flink-action-0.9.0.jar \
  4. compact \
  5. --warehouse <warehouse-path> \
  6. --database <database-name> \
  7. --table <table-name> \
  8. --partition_idle_time <partition-idle-time> \
  9. [--partition <partition-name>] \
  10. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  11. [--table_conf <paimon-table-dynamic-conf> [--table_conf <paimon-table-dynamic-conf>] ...]

There are one new configuration in Historical Partition Compact

  • --partition_idle_time: this is used to do a full compaction for partition which had not received any new data for ‘partition_idle_time’. And only these partitions will be compacted.

    Flink SQL

Run the following sql:

  1. -- history partition compact table
  2. CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '1 d')

For Databases

This is for multiple tables in different databases.

Flink Action Jar

  1. <FLINK_HOME>/bin/flink run \
  2. -D execution.runtime-mode=batch \
  3. /path/to/paimon-flink-action-0.9.0.jar \
  4. compact_database \
  5. --warehouse <warehouse-path> \
  6. --including_databases <database-name|name-regular-expr> \
  7. --partition_idle_time <partition-idle-time> \
  8. [--including_tables <paimon-table-name|name-regular-expr>] \
  9. [--excluding_tables <paimon-table-name|name-regular-expr>] \
  10. [--mode <compact-mode>] \
  11. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
  12. [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]

Example: compact historical partitions for tables in database

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. compact_database \
  4. --warehouse s3:///path/to/warehouse \
  5. --including_databases test_db \
  6. --partition_idle_time 1d \
  7. --catalog_conf s3.endpoint=https://****.com \
  8. --catalog_conf s3.access-key=***** \
  9. --catalog_conf s3.secret-key=*****

Flink SQL

Run the following sql:

  1. -- history partition compact table
  2. CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 'excludingTables', 'tableOptions', 'partition_idle_time')

Example: compact historical partitions for tables in database

  1. -- history partition compact table
  2. CALL sys.compact_database('test_db', 'combined', '', '', '', '1 d')