Rescale Bucket

Since the number of total buckets dramatically influences the performance, Paimon allows users to tune bucket numbers by ALTER TABLE command and reorganize data layout by INSERT OVERWRITE without recreating the table/partition. When executing overwrite jobs, the framework will automatically scan the data with the old bucket number and hash the record according to the current bucket number.

Rescale Overwrite

  1. -- rescale number of total buckets
  2. ALTER TABLE table_identifier SET ('bucket' = '...');
  3. -- reorganize data layout of table/partition
  4. INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
  5. SELECT ...
  6. FROM table_identifier
  7. [WHERE part_spec];

Please note that

  • ALTER TABLE only modifies the table’s metadata and will NOT reorganize or reformat existing data. Reorganize existing data must be achieved by INSERT OVERWRITE.
  • Rescale bucket number does not influence the read and running write jobs.
  • Once the bucket number is changed, any newly scheduled INSERT INTO jobs which write to without-reorganized existing table/partition will throw a TableException with message like

    1. Try to write table/partition ... with a new bucket num ...,
    2. but the previous bucket num is ... Please switch to batch mode,
    3. and perform INSERT OVERWRITE to rescale current data layout first.
  • For partitioned table, it is possible to have different bucket number for different partitions. E.g.

    1. ALTER TABLE my_table SET ('bucket' = '4');
    2. INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
    3. SELECT * FROM ...;
    4. ALTER TABLE my_table SET ('bucket' = '8');
    5. INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
    6. SELECT * FROM ...;
  • During overwrite period, make sure there are no other jobs writing the same table/partition.

Note: For the table which enables log system(e.g. Kafka), please rescale the topic’s partition as well to keep consistency.

Use Case

Rescale bucket helps to handle sudden spikes in throughput. Suppose there is a daily streaming ETL task to sync transaction data. The table’s DDL and pipeline are listed as follows.

  1. -- table DDL
  2. CREATE TABLE verified_orders (
  3. trade_order_id BIGINT,
  4. item_id BIGINT,
  5. item_price DOUBLE,
  6. dt STRING,
  7. PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED
  8. ) PARTITIONED BY (dt)
  9. WITH (
  10. 'bucket' = '16'
  11. );
  12. -- like from a kafka table
  13. CREATE temporary TABLE raw_orders(
  14. trade_order_id BIGINT,
  15. item_id BIGINT,
  16. item_price BIGINT,
  17. gmt_create STRING,
  18. order_status STRING
  19. ) WITH (
  20. 'connector' = 'kafka',
  21. 'topic' = '...',
  22. 'properties.bootstrap.servers' = '...',
  23. 'format' = 'csv'
  24. ...
  25. );
  26. -- streaming insert as bucket num = 16
  27. INSERT INTO verified_orders
  28. SELECT trade_order_id,
  29. item_id,
  30. item_price,
  31. DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
  32. FROM raw_orders
  33. WHERE order_status = 'verified';

The pipeline has been running well for the past few weeks. However, the data volume has grown fast recently, and the job’s latency keeps increasing. To improve the data freshness, users can

  • Suspend the streaming job with a savepoint ( see Suspended State and Stopping a Job Gracefully Creating a Final Savepoint )

    1. $ ./bin/flink stop \
    2. --savepointPath /tmp/flink-savepoints \
    3. $JOB_ID
  • Increase the bucket number

    1. -- scaling out
    2. ALTER TABLE verified_orders SET ('bucket' = '32');
  • Switch to the batch mode and overwrite the current partition(s) to which the streaming job is writing

    1. SET 'execution.runtime-mode' = 'batch';
    2. -- suppose today is 2022-06-22
    3. -- case 1: there is no late event which updates the historical partitions, thus overwrite today's partition is enough
    4. INSERT OVERWRITE verified_orders PARTITION (dt = '2022-06-22')
    5. SELECT trade_order_id,
    6. item_id,
    7. item_price
    8. FROM verified_orders
    9. WHERE dt = '2022-06-22';
    10. -- case 2: there are late events updating the historical partitions, but the range does not exceed 3 days
    11. INSERT OVERWRITE verified_orders
    12. SELECT trade_order_id,
    13. item_id,
    14. item_price,
    15. dt
    16. FROM verified_orders
    17. WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');
  • After overwrite job has finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint ( see Start a SQL Job from a savepoint )

    1. SET 'execution.runtime-mode' = 'streaming';
    2. SET 'execution.savepoint.path' = <savepointPath>;
    3. INSERT INTO verified_orders
    4. SELECT trade_order_id,
    5. item_id,
    6. item_price,
    7. DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
    8. FROM raw_orders
    9. WHERE order_status = 'verified';