Partial Update

By specifying 'merge-engine' = 'partial-update', users have the ability to update columns of a record through multiple updates until the record is complete. This is achieved by updating the value fields one by one, using the latest data under the same primary key. However, null values are not overwritten in the process.

For example, suppose Paimon receives three records:

  • <1, 23.0, 10, NULL>-
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

Assuming that the first column is the primary key, the final result would be <1, 25.2, 10, 'This is a book'>.

For streaming queries, partial-update merge engine must be used together with lookup or full-compaction changelog producer. (‘input’ changelog producer is also supported, but only returns input records.)

By default, Partial update can not accept delete records, you can choose one of the following solutions:

  • Configure ‘ignore-delete’ to ignore delete records.
  • Configure ‘partial-update.remove-record-on-delete’ to remove the whole row when receiving delete records.
  • Configure ‘sequence-group’s to retract partial columns.

Sequence Group

A sequence-field may not solve the disorder problem of partial-update tables with multiple stream updates, because the sequence-field may be overwritten by the latest data of another stream during multi-stream update.

So we introduce sequence group mechanism for partial-update tables. It can solve:

  1. Disorder during multi-stream update. Each stream defines its own sequence-groups.
  2. A true partial-update, not just a non-null update.

See example:

  1. CREATE TABLE t
  2. (
  3. k INT,
  4. a INT,
  5. b INT,
  6. g_1 INT,
  7. c INT,
  8. d INT,
  9. g_2 INT,
  10. PRIMARY KEY (k) NOT ENFORCED
  11. ) WITH (
  12. 'merge-engine' = 'partial-update',
  13. 'fields.g_1.sequence-group' = 'a,b',
  14. 'fields.g_2.sequence-group' = 'c,d'
  15. );
  16. INSERT INTO t
  17. VALUES (1, 1, 1, 1, 1, 1, 1);
  18. -- g_2 is null, c, d should not be updated
  19. INSERT INTO t
  20. VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));
  21. SELECT *
  22. FROM t;
  23. -- output 1, 2, 2, 2, 1, 1, 1
  24. -- g_1 is smaller, a, b should not be updated
  25. INSERT INTO t
  26. VALUES (1, 3, 3, 1, 3, 3, 3);
  27. SELECT *
  28. FROM t; -- output 1, 2, 2, 2, 3, 3, 3

For fields.<field-name>.sequence-group, valid comparative data types include: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

You can also configure multiple sorted fields in a sequence-group, like fields.<field-name1>,<field-name2>.sequence-group, multiple fields will be compared in order.

See example:

  1. CREATE TABLE SG
  2. (
  3. k INT,
  4. a INT,
  5. b INT,
  6. g_1 INT,
  7. c INT,
  8. d INT,
  9. g_2 INT,
  10. g_3 INT,
  11. PRIMARY KEY (k) NOT ENFORCED
  12. ) WITH (
  13. 'merge-engine' = 'partial-update',
  14. 'fields.g_1.sequence-group' = 'a,b',
  15. 'fields.g_2,g_3.sequence-group' = 'c,d'
  16. );
  17. INSERT INTO SG
  18. VALUES (1, 1, 1, 1, 1, 1, 1, 1);
  19. -- g_2, g_3 should not be updated
  20. INSERT INTO SG
  21. VALUES (1, 2, 2, 2, 2, 2, 1, CAST(NULL AS INT));
  22. SELECT *
  23. FROM SG;
  24. -- output 1, 2, 2, 2, 1, 1, 1, 1
  25. -- g_1 should not be updated
  26. INSERT INTO SG
  27. VALUES (1, 3, 3, 1, 3, 3, 3, 1);
  28. SELECT *
  29. FROM SG;
  30. -- output 1, 2, 2, 2, 3, 3, 3, 1

Aggregation For Partial Update

You can specify aggregation function for the input field, all the functions in the Aggregation are supported.

See example:

  1. CREATE TABLE t
  2. (
  3. k INT,
  4. a INT,
  5. b INT,
  6. c INT,
  7. d INT,
  8. PRIMARY KEY (k) NOT ENFORCED
  9. ) WITH (
  10. 'merge-engine' = 'partial-update',
  11. 'fields.a.sequence-group' = 'b',
  12. 'fields.b.aggregate-function' = 'first_value',
  13. 'fields.c.sequence-group' = 'd',
  14. 'fields.d.aggregate-function' = 'sum'
  15. );
  16. INSERT INTO t
  17. VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
  18. INSERT INTO t
  19. VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
  20. INSERT INTO t
  21. VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
  22. INSERT INTO t
  23. VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
  24. SELECT *
  25. FROM t; -- output 1, 2, 1, 2, 3

You can also configure an aggregation function for a sequence-group within multiple sorted fields.

See example:

  1. CREATE TABLE AGG
  2. (
  3. k INT,
  4. a INT,
  5. b INT,
  6. g_1 INT,
  7. c VARCHAR,
  8. g_2 INT,
  9. g_3 INT,
  10. PRIMARY KEY (k) NOT ENFORCED
  11. ) WITH (
  12. 'merge-engine' = 'partial-update',
  13. 'fields.a.aggregate-function' = 'sum',
  14. 'fields.g_1,g_3.sequence-group' = 'a',
  15. 'fields.g_2.sequence-group' = 'c');
  16. -- a in sequence-group g_1, g_3 with sum agg
  17. -- b not in sequence-group
  18. -- c in sequence-group g_2 without agg
  19. INSERT INTO AGG
  20. VALUES (1, 1, 1, 1, '1', 1, 1);
  21. -- g_2 should not be updated
  22. INSERT INTO AGG
  23. VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT), 2);
  24. SELECT *
  25. FROM AGG;
  26. -- output 1, 3, 2, 2, "1", 1, 2
  27. -- g_1, g_3 should not be updated
  28. INSERT INTO AGG
  29. VALUES (1, 3, 3, 2, '3', 3, 1);
  30. SELECT *
  31. FROM AGG;
  32. -- output 1, 6, 3, 2, "3", 3, 2

You can specify a default aggregation function for all the input fields with fields.default-aggregate-function, see example:

  1. CREATE TABLE t
  2. (
  3. k INT,
  4. a INT,
  5. b INT,
  6. c INT,
  7. d INT,
  8. PRIMARY KEY (k) NOT ENFORCED
  9. ) WITH (
  10. 'merge-engine' = 'partial-update',
  11. 'fields.a.sequence-group' = 'b',
  12. 'fields.c.sequence-group' = 'd',
  13. 'fields.default-aggregate-function' = 'last_non_null_value',
  14. 'fields.d.aggregate-function' = 'sum'
  15. );
  16. INSERT INTO t
  17. VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
  18. INSERT INTO t
  19. VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
  20. INSERT INTO t
  21. VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
  22. INSERT INTO t
  23. VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
  24. SELECT *
  25. FROM t; -- output 1, 2, 2, 2, 3