Procedures

Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.

In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you don’t want to pass some arguments, you must use '' as placeholder. For example, if you want to compact table default.t with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should be
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4').

In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional argument can be omitted. For the above example, the call statement is
CALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4').

Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.

Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.

All available procedures are listed below.

Procedure NameUsageExplanationExample
compactTo compact a table. Arguments:
  • table(required): the target table identifier.
  • partitions(optional): partition filter.
  • orderstrategy(optional): ‘order’ or ‘zorder’ or ‘hilbert’ or ‘none’.
  • order_by(optional): the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.
  • options(optional): additional dynamic options of the table.
  • CALL sys.compact(table => ‘default.T’, partitions => ‘p=0’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’)
    compact_databaseCALL [catalog.]sys.compact_database()

    CALL [catalog.]sys.compact_database(‘includingDatabases’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’)

    CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’)
    To compact databases. Arguments:
  • includingDatabases: to specify databases. You can use regular expression.
  • mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.
  • includingTables: to specify tables. You can use regular expression.
  • excludingTables: to specify tables that are not compacted. You can use regular expression.
  • tableOptions: additional dynamic options of the table.
  • CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table.‘, ‘ignore’, ‘sink.parallelism=4’)
    create_tag— based on the specified snapshot
    CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId)
    — based on the latest snapshot
    CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’)
    To create a tag based on given snapshot. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the new tag.
  • snapshotId (Long): id of the snapshot which the new tag is based on.
  • time_retained: The maximum time retained for newly created tags.
  • CALL sys.create_tag(‘default.T’, ‘my_tag’, 10, ‘1 d’)
    delete_tagCALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’)To delete a tag. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • tagName: name of the tag to be deleted.
  • CALL sys.delete_tag(‘default.T’, ‘my_tag’)
    merge_into— when matched then upsert
    CALL [catalog.]sys.merge_into(‘identifier’,’targetAlias’,
    ‘sourceSqls’,’sourceTable’,’mergeCondition’,
    ‘matchedUpsertCondition’,’matchedUpsertSetting’)

    — when matched then upsert; when not matched then insert
    CALL [catalog.]sys.merge_into(‘identifier’,’targetAlias’,
    ‘sourceSqls’,’sourceTable’,’mergeCondition’,
    ‘matchedUpsertCondition’,’matchedUpsertSetting’,
    ‘notMatchedInsertCondition’,’notMatchedInsertValues’)

    — when matched then delete
    CALL [catalog].sys.merge_into(‘identifier’,’targetAlias’,
    ‘sourceSqls’,’sourceTable’,’mergeCondition’,
    ‘matchedDeleteCondition’)

    — when matched then upsert + delete;
    — when not matched then insert
    CALL [catalog].sys.merge_into(‘identifier’,’targetAlias’,
    ‘sourceSqls’,’sourceTable’,’mergeCondition’,
    ‘matchedUpsertCondition’,’matchedUpsertSetting’,
    ‘notMatchedInsertCondition’,’notMatchedInsertValues’,
    ‘matchedDeleteCondition’)

    To perform “MERGE INTO” syntax. See merge_into action for details of arguments.— for matched order rows,
    — increase the price,
    — and if there is no match,
    — insert the order from
    — the source table
    CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘‘)
    remove_orphan_filesCALL [catalog.]sys.remove_orphan_files(‘identifier’)

    CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’)
    To remove the orphan data files and metadata files. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.
  • CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’)
    reset_consumer— reset the new next snapshot id in the consumer
    CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId)

    — delete consumer
    CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’)
    To reset or delete consumer. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • consumerId: consumer to be reset or deleted.
  • nextSnapshotId (Long): the new next snapshot id of the consumer.
  • CALL sys.reset_consumer(‘default.T’, ‘myid’, 10)
    rollback_to— rollback to a snapshot
    CALL sys.rollback_to(‘identifier’, snapshotId)

    — rollback to a tag
    CALL sys.rollback_to(‘identifier’, ‘tagName’)
    To rollback to a specific version of target table. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • snapshotId (Long): id of the snapshot that will roll back to.
  • tagName: name of the tag that will roll back to.
  • CALL sys.rollback_to(‘default.T’, 10)
    expire_snapshots— expires snapshot
    CALL sys.expire_snapshots(‘identifier’, retainMax)

    To expire snapshots. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • retainMax: the maximum number of completed snapshots to retain.
  • CALL sys.expire_snapshots(‘default.T’, 2)