Configuration

CoreOptions

Core options for paimon.

KeyDefaultTypeDescription
async-file-write
trueBooleanWhether to enable asynchronous IO writing when writing files.
auto-create
falseBooleanWhether to create underlying storage when reading and writing the table.
branch
“main”StringSpecify branch name.
bucket
-1IntegerBucket number for file store.
It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).
bucket-key
(none)StringSpecify the paimon distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.
If you specify multiple fields, delimiter is ‘,’.
If not specified, the primary key will be used; if there is no primary key, the full row will be used.
cache-page-size
64 kbMemorySizeMemory page size for caching.
changelog-producer
none

Enum

Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. This can be applied to tables with primary keys.

Possible values:
  • “none”: No changelog file.
  • “input”: Double write to a changelog file when flushing memory table, the changelog is from input.
  • “full-compaction”: Generate changelog files with each full compaction.
  • “lookup”: Generate changelog files through ‘lookup’ before committing the data writing.
changelog-producer.row-deduplicate
falseBooleanWhether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.
changelog.num-retained.max
(none)IntegerThe maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.
changelog.num-retained.min
(none)IntegerThe minimum number of completed changelog to retain. Should be greater than or equal to 1.
changelog.time-retained
(none)DurationThe maximum time of completed changelog to retain.
commit.callback.#.param
(none)StringParameter string for the constructor of class #. Callback class should parse the parameter by itself.
commit.callbacks
(none)StringA list of commit callback classes to be called after a successful commit. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).
commit.force-compact
falseBooleanWhether to force a compaction before commit.
commit.force-create-snapshot
falseBooleanWhether to force create snapshot on commit.
commit.user-prefix
(none)StringSpecifies the commit user prefix.
compaction.max-size-amplification-percent
200IntegerThe size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.
compaction.max.file-num
(none)IntegerFor file set [f0,…,fN], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files.
  • Default value of Append Table is ‘50’.
  • Default value of Bucketed Append Table is ‘5’.
compaction.min.file-num
5IntegerFor file set [f_0,…,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective.
compaction.optimization-interval
(none)DurationImplying how often to perform an optimization compaction, this configuration is used to ensure the query timeliness of the read-optimized system table.
compaction.size-ratio
1IntegerPercentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run’s size, then include next sorted run into this candidate set.
consumer-id
(none)StringConsumer id for recording the offset of consumption in the storage.
consumer.expiration-time
(none)DurationThe expiration interval of consumer files. A consumer file will be expired if it’s lifetime after last modification is over this value.
consumer.ignore-progress
falseBooleanWhether to ignore consumer progress for the newly started job.
consumer.mode
exactly-once

Enum

Specify the consumer consistency mode for table.

Possible values:
  • “exactly-once”: Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed.
  • “at-least-once”: Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.
continuous.discovery-interval
10 sDurationThe discovery interval of continuous reading.
cross-partition-upsert.bootstrap-parallelism
10IntegerThe parallelism for bootstrap in a single task for cross partition upsert.
cross-partition-upsert.index-ttl
(none)DurationThe TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.
delete-file.thread-num
(none)IntegerThe maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.
delete.force-produce-changelog
falseBooleanForce produce changelog in delete sql, or you can use ‘streaming-read-overwrite’ to read changelog from overwrite commit.
deletion-vector.index-file.target-size
2 mbMemorySizeThe target size of deletion vector index file.
deletion-vectors.enabled
falseBooleanWhether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.
dynamic-bucket.assigner-parallelism
(none)IntegerParallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner.
dynamic-bucket.initial-buckets
(none)IntegerInitial buckets for a partition in assigner operator for dynamic bucket mode.
dynamic-bucket.target-row-num
2000000LongIf the bucket is -1, for primary key table, is dynamic bucket mode, this option controls the target row number for one bucket.
dynamic-partition-overwrite
trueBooleanWhether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.
end-input.check-partition-expire
falseBooleanOptional endInput check partition expire used in case of batch mode or bounded stream.
fields.default-aggregate-function
(none)StringDefault aggregate function of all fields for partial-update and aggregate merge function.
file-index.in-manifest-threshold
500 bytesMemorySizeThe threshold to store file index bytes in manifest.
file-index.read.enabled
trueBooleanWhether enabled read file index.
file-reader-async-threshold
10 mbMemorySizeThe threshold for read file async.
file.block-size
(none)MemorySizeFile block size of format, default value of orc stripe is 64 MB, and parquet row group is 128 MB.
file.compression
“zstd”StringDefault file compression. For faster read and write, it is recommended to use zstd.
file.compression.per.level
MapDefine different compression policies for different level, you can add the conf like this: ‘file.compression.per.level’ = ‘0:lz4,1:zstd’.
file.compression.zstd-level
1IntegerDefault file compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.
file.format
“parquet”StringSpecify the message format of data files, currently orc, parquet and avro are supported.
file.format.per.level
MapDefine different file format for different level, you can add the conf like this: ‘file.format.per.level’ = ‘0:avro,3:parquet’, if the file format for level is not provided, the default format which set by file.format will be used.
force-lookup
falseBooleanWhether to force the use of lookup for compaction.
full-compaction.delta-commits
(none)IntegerFull compaction will be constantly triggered after delta commits.
ignore-delete
falseBooleanWhether to ignore delete records.
incremental-between
(none)StringRead incremental changes between start snapshot (exclusive) and end snapshot, for example, ‘5,10’ means changes between snapshot 5 and snapshot 10.
incremental-between-scan-mode
auto

Enum

Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot.

Possible values:
  • “auto”: Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
  • “delta”: Scan newly changed files between snapshots.
  • “changelog”: Scan changelog files between snapshots.
incremental-between-timestamp
(none)StringRead incremental changes between start timestamp (exclusive) and end timestamp, for example, ‘t1,t2’ means changes between timestamp t1 and timestamp t2.
local-merge-buffer-size
(none)MemorySizeLocal merge will buffer and merge input records before they’re shuffled by bucket and written into sink. The buffer will be flushed when it is full. Mainly to resolve data skew on primary keys. We recommend starting with 64 mb when trying out this feature.
local-sort.max-num-file-handles
128IntegerThe maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.
lookup-wait
trueBooleanWhen need to lookup, commit will wait for compaction by lookup.
lookup.cache-file-retention
1 hDurationThe cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk.
lookup.cache-max-disk-size
infiniteMemorySizeMax disk size for lookup cache, you can use this option to limit the use of local disks.
lookup.cache-max-memory-size
256 mbMemorySizeMax memory size for lookup cache.
lookup.cache-spill-compression
“zstd”StringSpill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.
lookup.cache.bloom.filter.enabled
trueBooleanWhether to enable the bloom filter for lookup cache.
lookup.cache.bloom.filter.fpp
0.05DoubleDefine the default false positive probability for lookup cache bloom filters.
lookup.hash-load-factor
0.75FloatThe index load factor for lookup.
lookup.local-file-type
hash

Enum

The local file type for lookup.

Possible values:
  • “sort”: Construct a sorted file for lookup.
  • “hash”: Construct a hash file for lookup.
manifest.compression
“zstd”StringDefault file compression for manifest.
manifest.format
“avro”StringSpecify the message format of manifest files.
manifest.full-compaction-threshold-size
16 mbMemorySizeThe size threshold for triggering full compaction of manifest.
manifest.merge-min-count
30IntegerTo avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.
manifest.target-file-size
8 mbMemorySizeSuggested file size of a manifest file.
merge-engine
deduplicate

Enum

Specify the merge engine for table with primary key.

Possible values:
  • “deduplicate”: De-duplicate and keep the last row.
  • “partial-update”: Partial update non-null fields.
  • “aggregation”: Aggregate fields with same primary key.
  • “first-row”: De-duplicate and keep the first row.
metadata.iceberg-compatible
falseBooleanWhen set to true, produce Iceberg metadata after a snapshot is committed, so that Iceberg readers can read Paimon’s raw files.
metadata.stats-mode
“truncate(16)”StringThe mode of metadata stats collection. none, counts, truncate(16), full is available.
  • “none”: means disable the metadata stats collection.
  • “counts” means only collect the null count.
  • “full”: means collect the null count, min/max value.
  • “truncate(16)”: means collect the null count, min/max value with truncated length of 16.
  • Field level stats mode can be specified by fields.{field_name}.stats-mode
metastore.partitioned-table
falseBooleanWhether to create this table as a partitioned table in metastore. For example, if you want to list all partitions of a Paimon table in Hive, you need to create this table as a partitioned table in Hive metastore. This config option does not affect the default filesystem metastore.
metastore.tag-to-partition
(none)StringWhether to create this table as a partitioned table for mapping non-partitioned table tags in metastore. This allows the Hive engine to view this table in a partitioned table view and use partitioning field to read specific partitions (specific tags).
metastore.tag-to-partition.preview
none

Enum

Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.

Possible values:
  • “none”: No automatically created tags.
  • “process-time”: Based on the time of the machine, create TAG once the processing time passes period time plus delay.
  • “watermark”: Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
  • “batch”: In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
num-levels
(none)IntegerTotal level number, for example, there are 3 levels, including 0,1,2 levels.
num-sorted-run.compaction-trigger
5IntegerThe sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).
num-sorted-run.stop-trigger
(none)IntegerThe number of sorted runs that trigger the stopping of writes, the default value is ‘num-sorted-run.compaction-trigger’ + 3.
page-size
64 kbMemorySizeMemory page size.
parquet.enable.dictionary
(none)IntegerTurn off the dictionary encoding for all fields in parquet.
partial-update.remove-record-on-delete
falseBooleanWhether to remove the whole row in partial-update engine when -D records are received.
partition
(none)StringDefine partition by table options, cannot define partition on DDL and table options at the same time.
partition.default-name
“__DEFAULT_PARTITION“StringThe default partition name in case the dynamic partition column value is null/empty string.
partition.expiration-check-interval
1 hDurationThe check interval of partition expiration.
partition.expiration-strategy
values-time

Enum

The strategy determines how to extract the partition time and compare it with the current time.

Possible values:
  • “values-time”: This strategy compares the time extracted from the partition value with the current time.
  • “update-time”: This strategy compares the last update time of the partition with the current time.
partition.expiration-time
(none)DurationThe expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
partition.mark-done-action
“success-file”StringAction to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.
1. ‘success-file’: add ‘_success’ file to directory.
2. ‘done-partition’: add ‘xxx.done’ partition to metastore.
3. ‘mark-event’: mark partition event to metastore.
Both can be configured at the same time: ‘done-partition,success-file,mark-event’.
partition.timestamp-formatter
(none)StringThe formatter to format timestamp from string. It can be used with ‘partition.timestamp-pattern’ to create a formatter using the specified value.
  • Default formatter is ‘yyyy-MM-dd HH:mm:ss’ and ‘yyyy-MM-dd’.
  • Supports multiple partition fields like ‘$year-$month-$day $hour:00:00’.
  • The timestamp-formatter is compatible with Java’s DateTimeFormatter.
partition.timestamp-pattern
(none)StringYou can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by ‘partition.timestamp-formatter’.
  • By default, read from the first field.
  • If the timestamp in the partition is a single field called ‘dt’, you can use ‘$dt’.
  • If it is spread across multiple fields for year, month, day, and hour, you can use ‘$year-$month-$day $hour:00:00’.
  • If the timestamp is in fields dt and hour, you can use ‘$dt $hour:00:00’.
primary-key
(none)StringDefine primary key by table options, cannot define primary key on DDL and table options at the same time.
read.batch-size
1024IntegerRead batch size for orc and parquet.
record-level.expire-time
(none)DurationRecord level expire time for primary key table, expiration happens in compaction, there is no strong guarantee to expire records in time. You must specific ‘record-level.time-field’ too.
record-level.time-field
(none)StringTime field for record level expire.
record-level.time-field-type
seconds-int

Enum

Time field type for record level expire, it can be seconds-int or millis-long.

Possible values:
  • “seconds-int”: Timestamps in seconds should be INT type.
  • “millis-long”: Timestamps in milliseconds should be BIGINT type.
rowkind.field
(none)StringThe field that generates the row kind for primary key table, the row kind determines which data is ‘+I’, ‘-U’, ‘+U’ or ‘-D’.
scan.bounded.watermark
(none)LongEnd condition “watermark” for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered.
scan.fallback-branch
(none)StringWhen a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.
scan.file-creation-time-millis
(none)LongAfter configuring this time, only the data files created after this time will be read. It is independent of snapshots, but it is imprecise filtering (depending on whether or not compaction occurs).
scan.manifest.parallelism
(none)IntegerThe parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning
scan.max-splits-per-task
10IntegerMax split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.
scan.mode
default

Enum

Specify the scanning behavior of the source.

Possible values:
  • “default”: Determines actual startup mode according to other table properties. If “scan.timestamp-millis” is set the actual startup mode will be “from-timestamp”, and if “scan.snapshot-id” or “scan.tag-name” is set the actual startup mode will be “from-snapshot”. Otherwise the actual startup mode will be “latest-full”.
  • “latest-full”: For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
  • “full”: Deprecated. Same as “latest-full”.
  • “latest”: For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the “latest-full” startup mode.
  • “compacted-full”: For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
  • “from-timestamp”: For streaming sources, continuously reads changes starting from timestamp specified by “scan.timestamp-millis”, without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by “scan.timestamp-millis” but does not read new changes.
  • “from-file-creation-time”: For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.
  • “from-snapshot”: For streaming sources, continuously reads changes starting from snapshot specified by “scan.snapshot-id”, without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by “scan.snapshot-id” or “scan.tag-name” but does not read new changes.
  • “from-snapshot-full”: For streaming sources, produces from snapshot specified by “scan.snapshot-id” on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by “scan.snapshot-id” but does not read new changes.
  • “incremental”: Read incremental changes between start and end snapshot or timestamp.
scan.plan-sort-partition
falseBooleanWhether to sort plan files by partition fields, this allows you to read according to the partition order, even if your partition writes are out of order.
It is recommended that you use this for streaming read of the ‘append-only’ table. By default, streaming read will read the full snapshot first. In order to avoid the disorder reading for partitions, you can open this option.
scan.snapshot-id
(none)LongOptional snapshot id used in case of “from-snapshot” or “from-snapshot-full” scan mode
scan.tag-name
(none)StringOptional tag name used in case of “from-snapshot” scan mode.
scan.timestamp
(none)StringOptional timestamp used in case of “from-timestamp” scan mode, it will be automatically converted to timestamp in unix milliseconds, use local time zone
scan.timestamp-millis
(none)LongOptional timestamp used in case of “from-timestamp” scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen.
scan.watermark
(none)LongOptional watermark used in case of “from-snapshot” scan mode. If there is no snapshot later than this watermark, will throw an exceptions.
sequence.field
(none)StringThe field that generates the sequence number for primary key table, the sequence number determines which data is the most recent.
sink.watermark-time-zone
“UTC”StringThe time zone to parse the long watermark value to TIMESTAMP value. The default value is ‘UTC’, which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone, the value should be the user configured local time zone. The option value is either a full name such as ‘America/Los_Angeles’, or a custom timezone id such as ‘GMT-08:00’.
snapshot.clean-empty-directories
falseBooleanWhether to try to clean empty directories when expiring snapshots, if enabled, please note:
  • hdfs: may print exceptions in NameNode.
  • oss/s3: may cause performance issue.
snapshot.expire.execution-mode
sync

Enum

Specifies the execution mode of expire.

Possible values:
  • “sync”: Execute expire synchronously. If there are too many files, it may take a long time and block stream processing.
  • “async”: Execute expire asynchronously. If the generation of snapshots is greater than the deletion, there will be a backlog of files.
snapshot.expire.limit
10IntegerThe maximum number of snapshots allowed to expire at a time.
snapshot.num-retained.max
infiniteIntegerThe maximum number of completed snapshots to retain. Should be greater than or equal to the minimum number.
snapshot.num-retained.min
10IntegerThe minimum number of completed snapshots to retain. Should be greater than or equal to 1.
snapshot.time-retained
1 hDurationThe maximum time of completed snapshots to retain.
snapshot.watermark-idle-timeout
(none)DurationIn watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.
sort-compaction.local-sample.magnification
1000IntegerThe magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.
sort-compaction.range-strategy
QUANTITY

Enum

The range strategy of sort compaction, the default value is quantity. If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, the config can be set to size.

Possible values:
  • “SIZE”
  • “QUANTITY”
sort-engine
loser-tree

Enum

Specify the sort engine for table with primary key.

Possible values:
  • “min-heap”: Use min-heap for multiway sorting.
  • “loser-tree”: Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.
sort-spill-buffer-size
64 mbMemorySizeAmount of data to spill records to disk in spilled sort.
sort-spill-threshold
(none)IntegerIf the maximum number of sort readers exceeds this value, a spill will be attempted. This prevents too many readers from consuming too much memory and causing OOM.
source.split.open-file-cost
4 mbMemorySizeOpen file cost of a source file. It is used to avoid reading too many files with a source split, which can be very slow.
source.split.target-size
128 mbMemorySizeTarget size of a source split when scanning a bucket.
spill-compression
“zstd”StringCompression for spill, currently zstd, lzo and zstd are supported.
spill-compression.zstd-level
1IntegerDefault spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.
streaming-read-mode
(none)

Enum

The mode of streaming read that specifies to read the data of table file or log.

Possible values:
  • “log”: Read from the data of table log store.
  • “file”: Read from the data of table file store.
streaming-read-overwrite
falseBooleanWhether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.
streaming.read.snapshot.delay
(none)DurationThe delay duration of stream read when scan incremental snapshots.
tag.automatic-completion
falseBooleanWhether to automatically complete missing tags.
tag.automatic-creation
none

Enum

Whether to create tag automatically. And how to generate tags.

Possible values:
  • “none”: No automatically created tags.
  • “process-time”: Based on the time of the machine, create TAG once the processing time passes period time plus delay.
  • “watermark”: Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
  • “batch”: In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
tag.callback.#.param
(none)StringParameter string for the constructor of class #. Callback class should parse the parameter by itself.
tag.callbacks
(none)StringA list of commit callback classes to be called after a successful tag. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).
tag.creation-delay
0 msDurationHow long is the delay after the period ends before creating a tag. This can allow some late data to enter the Tag.
tag.creation-period
daily

Enum

What frequency is used to generate tags.

Possible values:
  • “daily”: Generate a tag every day.
  • “hourly”: Generate a tag every hour.
  • “two-hours”: Generate a tag every two hours.
tag.default-time-retained
(none)DurationThe default maximum time retained for newly created tags. It affects both auto-created tags and manually created (by procedure) tags.
tag.num-retained-max
(none)IntegerThe maximum number of tags to retain. It only affects auto-created tags.
tag.period-formatter
with_dashes

Enum

The date format for tag periods.

Possible values:
  • “with_dashes”: Dates and hours with dashes, e.g., ‘yyyy-MM-dd HH’
  • “without_dashes”: Dates and hours without dashes, e.g., ‘yyyyMMdd HH’
target-file-size
(none)MemorySizeTarget size of a file.
  • primary key table: the default value is 128 MB.
  • append table: the default value is 256 MB.
write-buffer-for-append
falseBooleanThis option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error.
write-buffer-size
256 mbMemorySizeAmount of data to build up in memory before converting to a sorted on-disk file.
write-buffer-spill.max-disk-size
infiniteMemorySizeThe max disk to use for write buffer spill. This only work when the write buffer spill is enabled
write-buffer-spillable
(none)BooleanWhether the write buffer can be spillable. Enabled by default when using object storage.
write-manifest-cache
0 bytesMemorySizeCache size for reading manifest files for write initialization.
write-max-writers-to-spill
10IntegerWhen in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.
write-only
falseBooleanIf set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.
zorder.var-length-contribution
8IntegerThe bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.

CatalogOptions

Options for paimon catalog.

KeyDefaultTypeDescription
allow-upper-case
(none)BooleanIndicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.
cache-enabled
trueBooleanControls whether the catalog will cache databases, tables and manifests.
cache.expiration-interval
1 minDurationControls the duration for which databases and tables in the catalog are cached.
cache.manifest.max-memory
(none)MemorySizeControls the maximum memory to cache manifest content.
cache.manifest.small-file-memory
128 mbMemorySizeControls the cache memory to cache small manifest files.
cache.manifest.small-file-threshold
1 mbMemorySizeControls the threshold of small manifest file.
client-pool-size
2IntegerConfigure the size of the connection pool.
fs.allow-hadoop-fallback
trueBooleanAllow to fallback to hadoop File IO when no file io found for the scheme.
lineage-meta
(none)StringThe lineage meta to store table and data lineage information.

Possible values:
  • “jdbc”: Use standard jdbc to store table and data lineage information.
  • “custom”: You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.
lock-acquire-timeout
8 minDurationThe maximum time to wait for acquiring the lock.
lock-check-max-sleep
8 sDurationThe maximum sleep time when retrying to check the lock.
lock.enabled
(none)BooleanEnable Catalog Lock.
lock.type
(none)StringThe Lock Type for Catalog, such as ‘hive’, ‘zookeeper’.
metastore
“filesystem”StringMetastore of paimon catalog, supports filesystem, hive and jdbc.
sync-all-properties
falseBooleanSync all table properties to hive metastore
table.type
managed

Enum

Type of table.

Possible values:
  • “managed”: Paimon owned table where the entire lifecycle of the table data is managed.
  • “external”: The table where Paimon has loose coupling with the data stored in external locations.
uri
(none)StringUri of metastore server.
warehouse
(none)StringThe warehouse root path of catalog.

FilesystemCatalogOptions

Options for Filesystem catalog.

KeyDefaultTypeDescription
case-sensitive
trueBooleanIs case sensitive. If case insensitive, you need to set this option to false, and the table name and fields be converted to lowercase.

HiveCatalogOptions

Options for Hive catalog.

KeyDefaultTypeDescription
client-pool-cache.eviction-interval-ms
300000LongSetting the client’s pool cache eviction interval(ms).
client-pool-cache.keys
(none)StringSpecify client cache key, multiple elements separated by commas.
  • “ugi”: the Hadoop UserGroupInformation instance that represents the current user using the cache.
  • “user_name” similar to UGI but only includes the user’s name determined by UserGroupInformation#getUserName.
  • “conf”: name of an arbitrary configuration. The value of the configuration will be extracted from catalog properties and added to the cache key. A conf element should start with a “conf:” prefix which is followed by the configuration name. E.g. specifying “conf:a.b.c” will add “a.b.c” to the key, and so that configurations with different default catalog wouldn’t share the same client pool. Multiple conf elements can be specified.
format-table.enabled
falseBooleanWhether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.
hadoop-conf-dir
(none)StringFile directory of the core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml. Currently, only local file system paths are supported. If not configured, try to load from ‘HADOOP_CONF_DIR’ or ‘HADOOP_HOME’ system environment. Configure Priority: 1.from ‘hadoop-conf-dir’ 2.from HADOOP_CONF_DIR 3.from HADOOP_HOME/conf 4.HADOOP_HOME/etc/hadoop.
hive-conf-dir
(none)StringFile directory of the hive-site.xml , used to create HiveMetastoreClient and security authentication, such as Kerberos, LDAP, Ranger and so on. If not configured, try to load from ‘HIVE_CONF_DIR’ env.
location-in-properties
falseBooleanSetting the location in properties of hive table/database. If you don’t want to access the location by the filesystem of hive when using a object storage such as s3,oss you can set this option to true.

JdbcCatalogOptions

Options for Jdbc catalog.

KeyDefaultTypeDescription
catalog-key
“jdbc”StringCustom jdbc catalog store key.
lock-key-max-length
255IntegerSet the maximum length of the lock key. The ‘lock-key’ is composed of concatenating three fields : ‘catalog-key’, ‘database’, and ‘table’.

FlinkCatalogOptions

Flink catalog options for paimon.

KeyDefaultTypeDescription
default-database
“default”String
disable-create-table-in-default-db
falseBooleanIf true, creating table in default database is not allowed. Default is false.

FlinkConnectorOptions

Flink connector options for paimon.

KeyDefaultTypeDescription
end-input.watermark
(none)LongOptional endInput watermark used in case of batch mode or bounded stream.
lookup.async
falseBooleanWhether to enable async lookup join.
lookup.async-thread-number
16IntegerThe thread number for lookup async.
lookup.bootstrap-parallelism
4IntegerThe parallelism for bootstrap in a single task for lookup join.
lookup.cache
AUTO

Enum

The cache mode of lookup join.

Possible values:
  • “AUTO”
  • “FULL”
lookup.dynamic-partition
(none)StringSpecific dynamic partition for lookup, only support ‘max_pt()’ currently.
lookup.dynamic-partition.refresh-interval
1 hDurationSpecific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition.
lookup.refresh.async
falseBooleanWhether to refresh lookup table in an async thread.
lookup.refresh.async.pending-snapshot-count
5IntegerIf the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync.
partition.end-input-to-done
falseBooleanWhether mark the done status to indicate that the data is ready when end input.
partition.idle-time-to-done
(none)DurationSet a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready.
partition.time-interval
(none)DurationYou can specify time interval for partition, for example, daily partition is ‘1 d’, hourly partition is ‘1 h’.
scan.infer-parallelism
trueBooleanIf it is false, parallelism of source are set by global parallelism. Otherwise, source parallelism is inferred from splits number (batch mode) or bucket number(streaming mode).
scan.infer-parallelism.max
1024IntegerIf scan.infer-parallelism is true, limit the parallelism of source through this option.
scan.parallelism
(none)IntegerDefine a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.
scan.push-down
trueBooleanIf true, flink will push down projection, filters, limit to the source. The cost is that it is difficult to reuse the source in a job. With flink 1.18 or higher version, it is possible to reuse the source even with projection push down.
scan.remove-normalize
falseBooleanWhether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.
scan.split-enumerator.batch-size
10IntegerHow many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed akka.framesize limit.
scan.split-enumerator.mode
fair

Enum

The mode used by StaticFileStoreSplitEnumerator to assign splits.

Possible values:
  • “fair”: Distribute splits evenly when batch reading to prevent a few tasks from reading all.
  • “preemptive”: Distribute splits preemptively according to the consumption speed of the task.
scan.watermark.alignment.group
(none)StringA group of sources to align watermarks.
scan.watermark.alignment.max-drift
(none)DurationMaximal drift to align watermarks, before we pause consuming from the source/task/partition.
scan.watermark.alignment.update-interval
1 sDurationHow often tasks should notify coordinator about the current watermark and how often the coordinator should announce the maximal aligned watermark.
scan.watermark.emit.strategy
on-event

Enum

Emit strategy for watermark generation.

Possible values:
  • “on-periodic”: Emit watermark periodically, interval is controlled by Flink ‘pipeline.auto-watermark-interval’.
  • “on-event”: Emit watermark per record.
scan.watermark.idle-timeout
(none)DurationIf no records flow in a partition of a stream for that amount of time, then that partition is considered “idle” and will not hold back the progress of watermarks in downstream operators.
sink.clustering.by-columns
(none)StringSpecifies the column name(s) used for comparison during range partitioning, in the format ‘columnName1,columnName2’. If not set or set to an empty string, it indicates that the range partitioning feature is not enabled. This option will be effective only for bucket unaware table without primary keys and batch execution mode.
sink.clustering.sample-factor
100IntegerSpecifies the sample factor. Let S represent the total number of samples, F represent the sample factor, and P represent the sink parallelism, then S=F×P. The minimum allowed sample factor is 20.
sink.clustering.sort-in-cluster
trueBooleanIndicates whether to further sort data belonged to each sink task after range partitioning.
sink.clustering.strategy
“auto”StringSpecifies the comparison algorithm used for range partitioning, including ‘zorder’, ‘hilbert’, and ‘order’, corresponding to the z-order curve algorithm, hilbert curve algorithm, and basic type comparison algorithm, respectively. When not configured, it will automatically determine the algorithm based on the number of columns in ‘sink.clustering.by-columns’. ‘order’ is used for 1 column, ‘zorder’ for less than 5 columns, and ‘hilbert’ for 5 or more columns.
sink.committer-cpu
1.0DoubleSink committer cpu to control cpu cores of global committer.
sink.committer-memory
(none)MemorySizeSink committer memory to control heap memory of global committer.
sink.committer-operator-chaining
trueBooleanAllow sink committer and writer operator to be chained together
sink.cross-partition.managed-memory
256 mbMemorySizeWeight of managed memory for RocksDB in cross-partition update, Flink will compute the memory size according to the weight, the actual memory used depends on the running environment.
sink.managed.writer-buffer-memory
256 mbMemorySizeWeight of writer buffer in managed memory, Flink will compute the memory size for writer according to the weight, the actual memory used depends on the running environment.
sink.parallelism
(none)IntegerDefines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.
sink.savepoint.auto-tag
falseBooleanIf true, a tag will be automatically created for the snapshot created by flink savepoint.
sink.use-managed-memory-allocator
falseBooleanIf true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.
source.checkpoint-align.enabled
falseBooleanWhether to align the flink checkpoint with the snapshot of the paimon table, If true, a checkpoint will only be made if a snapshot is consumed.
source.checkpoint-align.timeout
30 sDurationIf the new snapshot has not been generated when the checkpoint starts to trigger, the enumerator will block the checkpoint and wait for the new snapshot. Set the maximum waiting time to avoid infinite waiting, if timeout, the checkpoint will fail. Note that it should be set smaller than the checkpoint timeout.
streaming-read.shuffle-bucket-with-partition
trueBooleanWhether shuffle by partition and bucket when streaming read.
unaware-bucket.compaction.parallelism
(none)IntegerDefines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.

SparkCatalogOptions

Spark catalog options for paimon.

KeyDefaultTypeDescription
catalog.create-underlying-session-catalog
falseBooleanIf true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.
defaultDatabase
“default”StringThe default database name.

SparkConnectorOptions

Spark connector options for paimon.

KeyDefaultTypeDescription
read.changelog
falseBooleanWhether to read row in the form of changelog (add rowkind column in row to represent its change type).
read.stream.maxBytesPerTrigger
(none)LongThe maximum number of bytes returned in a single batch.
read.stream.maxFilesPerTrigger
(none)IntegerThe maximum number of files returned in a single batch.
read.stream.maxRowsPerTrigger
(none)LongThe maximum number of rows returned in a single batch.
read.stream.maxTriggerDelayMs
(none)LongThe maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.
read.stream.minRowsPerTrigger
(none)LongThe minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.
write.merge-schema
falseBooleanIf true, merge the data schema and the table schema automatically before write data.
write.merge-schema.explicit-cast
falseBooleanIf true, allow to merge data types if the two types meet the rules for explicit casting.

ORC Options

KeyDefaultTypeDescription
orc.column.encoding.direct
(none)IntegerComma-separated list of fields for which dictionary encoding is to be skipped in orc.
orc.dictionary.key.threshold
0.8DoubleIf the number of distinct keys in a dictionary is greater than this fraction of the total number of non-null rows, turn off dictionary encoding in orc. Use 0 to always disable dictionary encoding. Use 1 to always use dictionary encoding.
orc.write.batch-size
1024Integerwrite batch size for orc.

RocksDB Options

The following options allow users to finely adjust RocksDB for better performance. You can either specify them in table properties or in dynamic table hints.

KeyDefaultTypeDescription
lookup.cache-rows
10000LongThe maximum number of rows to store in the cache.
lookup.continuous.discovery-interval
(none)DurationThe discovery interval of lookup continuous reading. This is used as an SQL hint. If it’s not configured, the lookup function will fallback to ‘continuous.discovery-interval’.
rocksdb.block.blocksize
4 kbMemorySizeThe approximate size (in bytes) of user data packed per block. The default blocksize is ‘4KB’.
rocksdb.block.cache-size
128 mbMemorySizeThe amount of the cache for data blocks in RocksDB.
rocksdb.block.metadata-blocksize
4 kbMemorySizeApproximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is ‘4KB’.
rocksdb.bloom-filter.bits-per-key
10.0DoubleBits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0.
rocksdb.bloom-filter.block-based-mode
falseBooleanIf true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is ‘false’.
rocksdb.compaction.level.max-size-level-base
256 mbMemorySizeThe upper-bound of the total size of level base files in bytes. The default value is ‘256MB’.
rocksdb.compaction.level.target-file-size-base
64 mbMemorySizeThe target file size for compaction, which determines a level-1 file size. The default value is ‘64MB’.
rocksdb.compaction.level.use-dynamic-size
falseBooleanIf true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is ‘false’. For more information, please refer to RocksDB’s doc.
rocksdb.compaction.style
LEVEL

Enum

The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses ‘LEVEL’ as default style.

Possible values:
  • “LEVEL”
  • “UNIVERSAL”
  • “FIFO”
  • “NONE”
rocksdb.compression.type
LZ4_COMPRESSION

Enum

The compression type.

Possible values:
  • “NO_COMPRESSION”
  • “SNAPPY_COMPRESSION”
  • “ZLIB_COMPRESSION”
  • “BZLIB2_COMPRESSION”
  • “LZ4_COMPRESSION”
  • “LZ4HC_COMPRESSION”
  • “XPRESS_COMPRESSION”
  • “ZSTD_COMPRESSION”
  • “DISABLE_COMPRESSION_OPTION”
rocksdb.files.open
-1IntegerThe maximum number of open files (per stateful operator) that can be used by the DB, ‘-1’ means no limit. The default value is ‘-1’.
rocksdb.thread.num
2IntegerThe maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is ‘2’.
rocksdb.use-bloom-filter
falseBooleanIf true, every newly created SST file will contain a Bloom filter. It is disabled by default.
rocksdb.writebuffer.count
2IntegerThe maximum number of write buffers that are built up in memory. The default value is ‘2’.
rocksdb.writebuffer.number-to-merge
1IntegerThe minimum number of write buffers that will be merged together before writing to storage. The default value is ‘1’.
rocksdb.writebuffer.size
64 mbMemorySizeThe amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is ‘64MB’.