Manage Branches

In streaming data processing, it’s difficult to correct data for it may affect the existing data, and users will see the streaming provisional results, which is not expected.

We suppose the branch that the existing workflow is processing on is ‘main’ branch, by creating custom data branch, it can help to do experimental tests and data validating for the new job on the existing table, which doesn’t need to stop the existing reading / writing workflows and no need to copy data from the main branch.

By merge or replace branch operations, users can complete the correcting of data.

Create Branches

Paimon supports creating branch from a specific tag or snapshot, or just creating an empty branch which means the initial state of the created branch is like an empty table.

Flink

Run the following sql:

  1. -- create branch named 'branch1' from tag 'tag1'
  2. CALL sys.create_branch('default.T', 'branch1', 'tag1');
  3. -- create empty branch named 'branch1'
  4. CALL sys.create_branch('default.T', 'branch1');

Flink Action Jar

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. create_branch \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --branch_name <branch-name> \
  8. [--tag_name <tag-name>] \
  9. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Delete Branches

You can delete branch by its name.

Flink

Run the following sql:

  1. CALL sys.delete_branch('default.T', 'branch1');

Flink Action Jar

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. delete_branch \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --branch_name <branch-name> \
  8. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Read / Write With Branch

You can read or write with branch as below.

Flink

  1. -- read from branch 'branch1'
  2. SELECT * FROM `t$branch_branch1`;
  3. SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' = 'myid') */;
  4. -- write to branch 'branch1'
  5. INSERT INTO `t$branch_branch1` SELECT ...

Fast Forward

Fast-Forward the custom branch to main will delete all the snapshots, tags and schemas in the main branch that are created after the branch’s initial tag. And copy snapshots, tags and schemas from the branch to the main branch.

Flink

  1. CALL sys.fast_forward('default.T', 'branch1');

Flink Action Jar

Run the following command:

  1. <FLINK_HOME>/bin/flink run \
  2. /path/to/paimon-flink-action-0.9.0.jar \
  3. fast_forward \
  4. --warehouse <warehouse-path> \
  5. --database <database-name> \
  6. --table <table-name> \
  7. --branch_name <branch-name> \
  8. [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

Batch Reading from Fallback Branch

You can set the table option scan.fallback-branch so that when a batch job reads from the current branch, if a partition does not exist, the reader will try to read this partition from the fallback branch. For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch.

What’s the use case of this feature? Say you have created a Paimon table partitioned by date. You have a long-running streaming job which inserts records into Paimon, so that today’s data can be queried in time. You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon, so that the preciseness of the data can be promised.

When you query from this Paimon table, you would like to first read from the results of batch job. But if a partition (for example, today’s partition) does not exist in its result, then you would like to read from the results of streaming job. In this case, you can create a branch for streaming job, and set scan.fallback-branch to this streaming branch.

Let’s look at an example.

Flink

  1. -- create Paimon table
  2. CREATE TABLE T (
  3. dt STRING NOT NULL,
  4. name STRING NOT NULL,
  5. amount BIGINT
  6. ) PARTITIONED BY (dt);
  7. -- create a branch for streaming job
  8. CALL sys.create_branch('default.T', 'test');
  9. -- set primary key and bucket number for the branch
  10. ALTER TABLE `T$branch_test` SET (
  11. 'primary-key' = 'dt,name',
  12. 'bucket' = '2',
  13. 'changelog-producer' = 'lookup'
  14. );
  15. -- set fallback branch
  16. ALTER TABLE T SET (
  17. 'scan.fallback-branch' = 'test'
  18. );
  19. -- write records into the streaming branch
  20. INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
  21. -- write records into the default branch
  22. INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
  23. SELECT * FROM T;
  24. /*
  25. +------------------+------------------+--------+
  26. | dt | name | amount |
  27. +------------------+------------------+--------+
  28. | 20240725 | apple | 5 |
  29. | 20240725 | banana | 7 |
  30. | 20240726 | cherry | 3 |
  31. | 20240726 | pear | 6 |
  32. +------------------+------------------+--------+
  33. */
  34. -- reset fallback branch
  35. ALTER TABLE T RESET ( 'scan.fallback-branch' );
  36. -- now it only reads from default branch
  37. SELECT * FROM T;
  38. /*
  39. +------------------+------------------+--------+
  40. | dt | name | amount |
  41. +------------------+------------------+--------+
  42. | 20240725 | apple | 5 |
  43. | 20240725 | banana | 7 |
  44. +------------------+------------------+--------+
  45. */