Manage Tags

Paimon’s snapshots can provide an easy way to query historical data. But in most scenarios, a job will generate too many snapshots and table will expire old snapshots according to table configuration. Snapshot expiration will also delete old data files, and the historical data of expired snapshots cannot be queried anymore.

To solve this problem, you can create a tag based on a snapshot. The tag will maintain the manifests and data files of the snapshot. A typical usage is creating tags daily, then you can maintain the historical data of each day for batch reading.

Automatic Creation

Paimon supports automatic creation of tags in writing job.

Step 1: Choose Creation Mode

You can set creation mode by table option 'tag.automatic-creation'. Supported values are:

  • process-time: Create TAG based on the time of the machine.
  • watermark: Create TAG based on the watermark of the Sink input.
  • batch: In a batch processing scenario, a tag is generated after the current task is completed.

If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the UTC time zone, please configure 'sink.watermark-time-zone'.

Step 2: Choose Creation Period

What frequency is used to generate tags. You can choose 'daily', 'hourly' and 'two-hours' for 'tag.creation-period'.

If you need to wait for late data, you can configure a delay time: 'tag.creation-delay'.

Step 3: Automatic deletion of tags

You can configure 'tag.num-retained-max' or tag.default-time-retained to delete tags automatically.

Example, configure table to create a tag at 0:10 every day, with a maximum retention time of 3 months:

  1. -- Flink SQL
  2. CREATE TABLE t (
  3. k INT PRIMARY KEY NOT ENFORCED,
  4. f0 INT,
  5. ...
  6. ) WITH (
  7. 'tag.automatic-creation' = 'process-time',
  8. 'tag.creation-period' = 'daily',
  9. 'tag.creation-delay' = '10 m',
  10. 'tag.num-retained-max' = '90'
  11. );
  12. INSERT INTO t SELECT ...;
  13. -- Spark SQL
  14. -- Read latest snapshot
  15. SELECT * FROM t;
  16. -- Read Tag snapshot
  17. SELECT * FROM t VERSION AS OF '2023-07-26';
  18. -- Read Incremental between Tags
  19. SELECT * FROM paimon_incremental_query('t', '2023-07-25', '2023-07-26');

See Query Tables to see more query for Spark.

Create Tags

You can create a tag with given name and snapshot ID.

Flink

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. create_tag \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --tag_name <tag-name> \
  8. [--snapshot <snapshot_id>] \
  9. [--time_retained <time-retained>] \
  10. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

If snapshot unset, snapshot_id defaults to the latest.

Java API

  1. import org.apache.paimon.table.Table;
  2. public class CreateTag {
  3. public static void main(String[] args) {
  4. Table table = ...;
  5. table.createTag("my-tag", 1);
  6. table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12));
  7. }
  8. }

Spark

Run the following sql:

  1. CALL sys.create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);

To create a tag with retained 1 day, run the following sql:

  1. CALL sys.create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d');

To create a tag based on the latest snapshot id, run the following sql:

  1. CALL sys.create_tag(table => 'test.t', tag => 'test_tag');

Delete Tags

You can delete a tag by its name.

Flink

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. delete_tag \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --tag_name <tag-name> \
  8. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Java API

  1. import org.apache.paimon.table.Table;
  2. public class DeleteTag {
  3. public static void main(String[] args) {
  4. Table table = ...;
  5. table.deleteTag("my-tag");
  6. }
  7. }

Spark

Run the following sql:

  1. CALL sys.delete_tag(table => 'test.t', tag => 'test_tag');

Rollback to Tag

Rollback table to a specific tag. All snapshots and tags whose snapshot id is larger than the tag will be deleted (and the data will be deleted too).

Flink

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. rollback_to \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --version <tag-name> \
  8. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Java API

  1. import org.apache.paimon.table.Table;
  2. public class RollbackTo {
  3. public static void main(String[] args) {
  4. // before rollback:
  5. // snapshot-3 [expired] -> tag3
  6. // snapshot-4 [expired]
  7. // snapshot-5 -> tag5
  8. // snapshot-6
  9. // snapshot-7
  10. table.rollbackTo("tag3");
  11. // after rollback:
  12. // snapshot-3 -> tag3
  13. }
  14. }

Spark

Run the following sql:

  1. CALL sys.rollback(table => 'test.t', version => '2');