Merge Engine

When Paimon sink receives two or more records with the same primary keys, it will merge them into one record to keep primary keys unique. By specifying the merge-engine table property, users can choose how records are merged together.

Always set table.exec.sink.upsert-materialize to NONE in Flink SQL TableConfig, sink upsert-materialize may result in strange behavior. When the input is out of order, we recommend that you use Sequence Field to correct disorder.

Deduplicate

deduplicate merge engine is the default merge engine. Paimon will only keep the latest record and throw away other records with the same primary keys.

Specifically, if the latest record is a DELETE record, all records with the same primary keys will be deleted. You can config ignore-delete to ignore it.

Partial Update

By specifying 'merge-engine' = 'partial-update', Users have the ability to update columns of a record through multiple updates until the record is complete. This is achieved by updating the value fields one by one, using the latest data under the same primary key. However, null values are not overwritten in the process.

For example, suppose Paimon receives three records:

  • <1, 23.0, 10, NULL>-
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

Assuming that the first column is the primary key, the final result would be <1, 25.2, 10, 'This is a book'>.

For streaming queries, partial-update merge engine must be used together with lookup or full-compaction changelog producer. (‘input’ changelog producer is also supported, but only returns input records.)

By default, Partial update can not accept delete records, you can choose one of the following solutions:

  • Configure ‘ignore-delete’ to ignore delete records.
  • Configure ‘sequence-group’s to retract partial columns.

Sequence Group

A sequence-field may not solve the disorder problem of partial-update tables with multiple stream updates, because the sequence-field may be overwritten by the latest data of another stream during multi-stream update.

So we introduce sequence group mechanism for partial-update tables. It can solve:

  1. Disorder during multi-stream update. Each stream defines its own sequence-groups.
  2. A true partial-update, not just a non-null update.

See example:

  1. CREATE TABLE t (
  2. k INT,
  3. a INT,
  4. b INT,
  5. g_1 INT,
  6. c INT,
  7. d INT,
  8. g_2 INT,
  9. PRIMARY KEY (k) NOT ENFORCED
  10. ) WITH (
  11. 'merge-engine'='partial-update',
  12. 'fields.g_1.sequence-group'='a,b',
  13. 'fields.g_2.sequence-group'='c,d'
  14. );
  15. INSERT INTO t VALUES (1, 1, 1, 1, 1, 1, 1);
  16. -- g_2 is null, c, d should not be updated
  17. INSERT INTO t VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));
  18. SELECT * FROM t; -- output 1, 2, 2, 2, 1, 1, 1
  19. -- g_1 is smaller, a, b should not be updated
  20. INSERT INTO t VALUES (1, 3, 3, 1, 3, 3, 3);
  21. SELECT * FROM t; -- output 1, 2, 2, 2, 3, 3, 3

For fields.<field-name>.sequence-group, valid comparative data types include: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

Aggregation For Partial Update

You can specify aggregation function for the input field, all the functions in the Aggregation are supported.

See example:

  1. CREATE TABLE t (
  2. k INT,
  3. a INT,
  4. b INT,
  5. c INT,
  6. d INT,
  7. PRIMARY KEY (k) NOT ENFORCED
  8. ) WITH (
  9. 'merge-engine'='partial-update',
  10. 'fields.a.sequence-group' = 'b',
  11. 'fields.b.aggregate-function' = 'first_value',
  12. 'fields.c.sequence-group' = 'd',
  13. 'fields.d.aggregate-function' = 'sum'
  14. );
  15. INSERT INTO t VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
  16. INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
  17. INSERT INTO t VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
  18. INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);
  19. SELECT * FROM t; -- output 1, 2, 1, 2, 3

Aggregation

NOTE: Always set table.exec.sink.upsert-materialize to NONE in Flink SQL TableConfig.

Sometimes users only care about aggregated results. The aggregation merge engine aggregates each value field with the latest data one by one under the same primary key according to the aggregate function.

Each field not part of the primary keys can be given an aggregate function, specified by the fields.<field-name>.aggregate-function table property, otherwise it will use last_non_null_value aggregation as default. For example, consider the following table definition.

Flink

  1. CREATE TABLE my_table (
  2. product_id BIGINT,
  3. price DOUBLE,
  4. sales BIGINT,
  5. PRIMARY KEY (product_id) NOT ENFORCED
  6. ) WITH (
  7. 'merge-engine' = 'aggregation',
  8. 'fields.price.aggregate-function' = 'max',
  9. 'fields.sales.aggregate-function' = 'sum'
  10. );

Field price will be aggregated by the max function, and field sales will be aggregated by the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result will be <1, 30.2, 35>.

Current supported aggregate functions and data types are:

  • sum: The sum function aggregates the values across multiple rows. It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.

  • product: The product function can compute product values across multiple lines. It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.

  • count: The count function counts the values across multiple rows. It supports INTEGER, BIGINT data types.

  • max: The max function identifies and retains the maximum value. It supports CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ data types.

  • min: The min function identifies and retains the minimum value. It supports CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ data types.

  • last_value: The last_value function replaces the previous value with the most recently imported value. It supports all data types.

  • last_non_null_value: The last_non_null_value function replaces the previous value with the latest non-null value. It supports all data types.

  • listagg: The listagg function concatenates multiple string values into a single string. It supports STRING data type.

  • bool_and: The bool_and function evaluates whether all values in a boolean set are true. It supports BOOLEAN data type.

  • bool_or: The bool_or function checks if at least one value in a boolean set is true. It supports BOOLEAN data type.

  • first_value: The first_value function retrieves the first null value from a data set. It supports all data types.

  • first_non_null_value: The first_non_null_value function selects the first non-null value in a data set. It supports all data types.

  • nested_update: The nested_update function collects multiple rows into one array (so-called ‘nested table’). It supports ARRAY data types.

    Use fields.<field-name>.nested-key=pk0,pk1,... to specify the primary keys of the nested table. If no keys, row will be appended to array.

    An example:

    Flink

    1. -- orders table
    2. CREATE TABLE orders (
    3. order_id BIGINT PRIMARY KEY NOT ENFORCED,
    4. user_name STRING,
    5. address STRING
    6. );
    7. -- sub orders that have the same order_id
    8. -- belongs to the same order
    9. CREATE TABLE sub_orders (
    10. order_id BIGINT,
    11. sub_order_id INT,
    12. product_name STRING,
    13. price BIGINT,
    14. PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
    15. );
    16. -- wide table
    17. CREATE TABLE order_wide (
    18. order_id BIGINT PRIMARY KEY NOT ENFORCED,
    19. user_name STRING,
    20. address STRING,
    21. sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
    22. ) WITH (
    23. 'merge-engine' = 'aggregation',
    24. 'fields.sub_orders.aggregate-function' = 'nested_update',
    25. 'fields.sub_orders.nested-key' = 'sub_order_id'
    26. );
    27. -- widen
    28. INSERT INTO order_wide
    29. SELECT
    30. order_id,
    31. user_name,
    32. address,
    33. CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
    34. FROM orders
    35. UNION ALL
    36. SELECT
    37. order_id,
    38. CAST (NULL AS STRING),
    39. CAST (NULL AS STRING),
    40. ARRAY[ROW(sub_order_id, product_name, price)]
    41. FROM sub_orders;
    42. -- query using UNNEST
    43. SELECT order_id, user_name, address, sub_order_id, product_name, price
    44. FROM order_wide, UNNEST(sub_orders) AS so(sub_order_id, product_name, price)
  • collect: The collect function collects elements into an Array. You can set fields.<field-name>.distinct=true to deduplicate elements. It only supports ARRAY type.

  • merge_map: The merge_map function merge input maps. It only supports MAP type.

For streaming queries, aggregation merge engine must be used together with lookup or full-compaction changelog producer. (‘input’ changelog producer is also supported, but only returns input records.)

Retract

Only sum, product, count, collect, merge_map, nested_update, last_value and last_non_null_value supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction. If you allow some functions to ignore retraction messages, you can configure: 'fields.${field_name}.ignore-retract'='true'.

The last_value and last_non_null_value just set field to null when accept retract messages.

The collect and merge_map make a best-effort attempt to handle retraction messages, but the results are not guaranteed to be accurate. The following behaviors may occur when processing retraction messages:

  1. It might fail to handle retraction messages if records are disordered. For example, the table uses collect, and the upstreams send +I['A', 'B'] and -U['A'] respectively. If the table receives -U['A'] first, it can do nothing; then it receives +I['A', 'B'], the merge result will be +I['A', 'B'] instead of +I['B'].

  2. The retract message from one upstream will retract the result merged from multiple upstreams. For example, the table uses merge_map, and one upstream sends +I[1->A], another upstream sends +I[1->B], -D[1->B] later. The table will merge two insert values to +I[1->B] first, and then the -D[1->B] will retract the whole result, so the final result is an empty map instead of +I[1->A]

First Row

By specifying 'merge-engine' = 'first-row', users can keep the first row of the same primary key. It differs from the deduplicate merge engine that in the first-row merge engine, it will generate insert only changelog.

  1. You can not specify sequence.field.
  2. Not accept DELETE and UPDATE_BEFORE message. You can config ignore-delete to ignore these two kinds records.

This is of great help in replacing log deduplication in streaming computation.