Procedures

This section introduce all available spark procedures about paimon.

Procedure NameExplanationExample
compactTo compact files. Argument:
  • table: the target table identifier. Cannot be empty.
  • partitions: partition filter. “,” means “AND”
    “;” means “OR”.If you want to compact one partition with date=01 and day=01, you need to write ‘date=01,day=01’. Left empty for all partitions. (Can’t be used together with “where”)
  • where: partition predicate. Left empty for all partitions. (Can’t be used together with “partitions”)
  • order_strategy: ‘order’ or ‘zorder’ or ‘hilbert’ or ‘none’. Left empty for ‘none’.
  • order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.
  • partition_idle_time: this is used to do a full compaction for partition which had not received any new data for ‘partition_idle_time’. And only these partitions will be compacted. This argument can not be used with order compact.
  • SET spark.sql.shuffle.partitions=10; —set the compact parallelism

    CALL sys.compact(table => ‘T’, partitions => ‘p=0;p=1’, order_strategy => ‘zorder’, order_by => ‘a,b’)

    CALL sys.compact(table => ‘T’, where => ‘p>0 and p<3’, order_strategy => ‘zorder’, order_by => ‘a,b’)

    CALL sys.compact(table => ‘T’, partition_idle_time => ‘60s’)
    expire_snapshotsTo expire snapshots. Argument:
  • table: the target table identifier. Cannot be empty.
  • retain_max: the maximum number of completed snapshots to retain.
  • retain_min: the minimum number of completed snapshots to retain.
  • older_than: timestamp before which snapshots will be removed.
  • max_deletes: the maximum number of snapshots that can be deleted at once.
  • CALL sys.expire_snapshots(table => ‘default.T’, retain_max => 10)
    expire_partitionsTo expire partitions. Argument:
  • table: the target table identifier. Cannot be empty.
  • expiration_time: the expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.
  • timestamp_formatter: the formatter to format timestamp from string.
  • timestamp_pattern: the pattern to get a timestamp from partitions.
  • expire_strategy: specifies the expiration strategy for partition expiration, possible values: ‘values-time’ or ‘update-time’ , ‘values-time’ as default.
  • CALL sys.expire_partitions(table => ‘default.T’, expiration_time => ‘1 d’, timestamp_formatter => ‘yyyy-MM-dd’, timestamp_pattern => ‘$dt’, expire_strategy => ‘values-time’)
    create_tagTo create a tag based on given snapshot. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the new tag. Cannot be empty.
  • snapshot(Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • — based on snapshot 10 with 1d
    CALL sys.create_tag(table => ‘default.T’, tag => ‘my_tag’, snapshot => 10, time_retained => ‘1 d’)

    — based on the latest snapshot
    CALL sys.create_tag(table => ‘default.T’, tag => ‘my_tag’)
    create_tag_from_timestampTo create a tag based on given timestamp. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tag: name of the new tag.
  • timestamp (Long): Find the first snapshot whose commit-time is greater than this timestamp.
  • time_retained : The maximum time retained for newly created tags.
  • CALL sys.create_tag_from_timestamp(table => ‘default.T’, tag => ‘my_tag’, timestamp => 1724404318750, time_retained => ‘1 d’)
    delete_tagTo delete a tag. Arguments:
  • table: the target table identifier. Cannot be empty.
  • tag: name of the tag to be deleted. If you specify multiple tags, delimiter is ‘,’.
  • CALL sys.delete_tag(table => ‘default.T’, tag => ‘my_tag’)
    rollbackTo rollback to a specific version of target table. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • CALL sys.rollback(table => ‘default.T’, version => ‘my_tag’)

    CALL sys.rollback(table => ‘default.T’, version => 10)
    migrate_tableMigrate hive table to a paimon table. Arguments:
  • source_type: the origin table’s type to be migrated, such as hive. Cannot be empty.
  • table: name of the origin table to be migrated. Cannot be empty.
  • options: the table options of the paimon table to migrate.
  • target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • options_map: Options map for adding key-value options which is a map.
  • CALL sys.migrate_table(source_type => ‘hive’, table => ‘default.T’, options => ‘file.format=parquet’, options_map => map(‘k1’,’v1’))
    migrate_fileMigrate from hive table to a paimon table. Arguments:
  • source_type: the origin table’s type to be migrated, such as hive. Cannot be empty.
  • source_table: name of the origin table to migrate. Cannot be empty.
  • target_table: name of the target table to be migrated. Cannot be empty.
  • delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
  • CALL sys.migrate_file(source_type => ‘hive’, table => ‘default.T’, delete_origin => true)
    remove_orphan_filesTo remove the orphan data files and metadata files. Arguments:
  • table: the target table identifier. Cannot be empty, you can use database_name. to clean whole database.
  • older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • dry_run: when true, view only orphan files, don’t actually remove files. Default is false.
  • CALL sys.remove_orphan_files(table => ‘default.T’, older_than => ‘2023-10-31 12:00:00’)

    CALL sys.remove_orphan_files(table => ‘default.‘, older_than => ‘2023-10-31 12:00:00’)

    CALL sys.remove_orphan_files(table => ‘default.T’, older_than => ‘2023-10-31 12:00:00’, dry_run => true)
    repairSynchronize information from the file system to Metastore. Argument:
  • database_or_table: empty or the target database name or the target table identifier, if you specify multiple tags, delimiter is ‘,’
  • CALL sys.repair(‘test_db.T’)

    CALL sys.repair(‘test_db.T,test_db01,test_db.T2’)
    create_branchTo merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • tag: name of the new tag. Cannot be empty.
  • CALL sys.create_branch(table => ‘test_db.T’, branch => ‘test_branch’)

    CALL sys.create_branch(table => ‘test_db.T’, branch => ‘test_branch’, tag => ‘my_tag’)

    delete_branchTo merge a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged. If you specify multiple branches, delimiter is ‘,’.
  • CALL sys.delete_branch(table => ‘test_db.T’, branch => ‘test_branch’)
    fast_forwardTo fast_forward a branch to main branch. Arguments:
  • table: the target table identifier. Cannot be empty.
  • branch: name of the branch to be merged.
  • CALL sys.fast_forward(table => ‘test_db.T’, branch => ‘test_branch’)
    reset_consumerTo reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • — reset the new next snapshot id in the consumer
    CALL sys.reset_consumer(table => ‘default.T’, consumerId => ‘myid’, nextSnapshotId => 10)

    — delete consumer
    CALL sys.reset_consumer(table => ‘default.T’, consumerId => ‘myid’)
    mark_partition_doneTo mark partition to be done. Arguments:
  • table: the target table identifier. Cannot be empty.
  • partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ‘;’.
  • — mark single partition done
    CALL sys.mark_partition_done(table => ‘default.T’, parititions => ‘day=2024-07-01’)

    — mark multiple partitions done
    CALL sys.mark_partition_done(table => ‘default.T’, parititions => ‘day=2024-07-01;day=2024-07-02’)