Append Queue

Definition

In this mode, you can regard append table as a queue separated by bucket. Every record in the same bucket is ordered strictly, streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need to config special configurations, all the data will go into one bucket as a queue. You can also define the bucket and bucket-key to enable larger parallelism and disperse data.

Append Queue - 图1

Example to create append queue table:

Flink

  1. CREATE TABLE my_table (
  2. product_id BIGINT,
  3. price DOUBLE,
  4. sales BIGINT
  5. ) WITH (
  6. 'bucket' = '8',
  7. 'bucket-key' = 'product_id'
  8. );

Compaction

By default, the sink node will automatically perform compaction to control the number of files. The following options control the strategy of compaction:

KeyDefaultTypeDescription
write-only
falseBooleanIf set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.
compaction.min.file-num
5IntegerFor file set [f_0,…,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append table. This value avoids almost-full-file to be compacted, which is not cost-effective.
compaction.max.file-num
50IntegerFor file set [f_0,…,f_N], the maximum file number to trigger a compaction for append table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance.
full-compaction.delta-commits
(none)IntegerFull compaction will be constantly triggered after delta commits.

Streaming Source

Streaming source behavior is only supported in Flink engine at present.

Streaming Read Order

For streaming reads, records are produced in the following order:

  • For any two records from two different partitions
    • If scan.plan-sort-partition is set to true, the record with a smaller partition value will be produced first.
    • Otherwise, the record with an earlier partition creation time will be produced first.
  • For any two records from the same partition and the same bucket, the first written record will be produced first.
  • For any two records from the same partition but two different buckets, different buckets are processed by different tasks, there is no order guarantee between them.

Watermark Definition

You can define watermark for reading Paimon tables:

  1. CREATE TABLE t (
  2. `user` BIGINT,
  3. product STRING,
  4. order_time TIMESTAMP(3),
  5. WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
  6. ) WITH (...);
  7. -- launch a bounded streaming job to read paimon_table
  8. SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
  9. TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

You can also enable Flink Watermark alignment, which will make sure no sources/splits/shards/partitions increase their watermarks too far ahead of the rest:

KeyDefaultTypeDescription
scan.watermark.alignment.group
(none)StringA group of sources to align watermarks.
scan.watermark.alignment.max-drift
(none)DurationMaximal drift to align watermarks, before we pause consuming from the source/task/partition.

Bounded Stream

Streaming Source can also be bounded, you can specify ‘scan.bounded.watermark’ to define the end condition for bounded streaming mode, stream reading will end until a larger watermark snapshot is encountered.

Watermark in snapshot is generated by writer, for example, you can specify a kafka source and declare the definition of watermark. When using this kafka source to write to Paimon table, the snapshots of Paimon table will generate the corresponding watermark, so that you can use the feature of bounded watermark when streaming reads of this Paimon table.

  1. CREATE TABLE kafka_table (
  2. `user` BIGINT,
  3. product STRING,
  4. order_time TIMESTAMP(3),
  5. WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
  6. ) WITH ('connector' = 'kafka'...);
  7. -- launch a streaming insert job
  8. INSERT INTO paimon_table SELECT * FROM kakfa_table;
  9. -- launch a bounded streaming job to read paimon_table
  10. SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;