Procedures
Flink 1.18 and later versions support Call Statements, which make it easier to manipulate data and metadata of Paimon table by writing SQLs instead of submitting Flink jobs.
In 1.18, the procedure only supports passing arguments by position. You must pass all arguments in order, and if you don’t want to pass some arguments, you must use ''
as placeholder. For example, if you want to compact table default.t
with parallelism 4, but you don’t want to specify partitions and sort strategy, the call statement should beCALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
.
In higher versions, the procedure supports passing arguments by name. You can pass arguments in any order and any optional argument can be omitted. For the above example, the call statement isCALL sys.compact(`table` => 'default.t', options => 'sink.parallelism=4')
.
Specify partitions: we use string to represent partition filter. “,” means “AND” and “;” means “OR”. For example, if you want to specify two partitions date=01 and date=02, you need to write ‘date=01;date=02’; If you want to specify one partition with date=01 and day=01, you need to write ‘date=01,day=01’.
Table options syntax: we use string to represent table options. The format is ‘key1=value1,key2=value2…’.
All available procedures are listed below.
Procedure Name | Usage | Explanation | Example |
---|---|---|---|
compact | To compact a table. Arguments: | CALL sys.compact(table => ‘default.T’, partitions => ‘p=0’, order_strategy => ‘zorder’, order_by => ‘a,b’, options => ‘sink.parallelism=4’) | |
compact_database | CALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’) | To compact databases. Arguments: | CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table.‘, ‘ignore’, ‘sink.parallelism=4’) |
create_tag | — based on the specified snapshot CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId) — based on the latest snapshot CALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’) | To create a tag based on given snapshot. Arguments: | CALL sys.create_tag(‘default.T’, ‘my_tag’, 10, ‘1 d’) |
delete_tag | CALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’) | To delete a tag. Arguments: | CALL sys.delete_tag(‘default.T’, ‘my_tag’) |
merge_into | — when matched then upsert CALL [catalog.]sys.merge_into(‘identifier’,’targetAlias’, ‘sourceSqls’,’sourceTable’,’mergeCondition’, ‘matchedUpsertCondition’,’matchedUpsertSetting’) — when matched then upsert; when not matched then insert CALL [catalog.]sys.merge_into(‘identifier’,’targetAlias’, ‘sourceSqls’,’sourceTable’,’mergeCondition’, ‘matchedUpsertCondition’,’matchedUpsertSetting’, ‘notMatchedInsertCondition’,’notMatchedInsertValues’) — when matched then delete CALL [catalog].sys.merge_into(‘identifier’,’targetAlias’, ‘sourceSqls’,’sourceTable’,’mergeCondition’, ‘matchedDeleteCondition’) — when matched then upsert + delete; — when not matched then insert CALL [catalog].sys.merge_into(‘identifier’,’targetAlias’, ‘sourceSqls’,’sourceTable’,’mergeCondition’, ‘matchedUpsertCondition’,’matchedUpsertSetting’, ‘notMatchedInsertCondition’,’notMatchedInsertValues’, ‘matchedDeleteCondition’) | To perform “MERGE INTO” syntax. See merge_into action for details of arguments. | — for matched order rows, — increase the price, — and if there is no match, — insert the order from — the source table CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘‘) |
remove_orphan_files | CALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’) | To remove the orphan data files and metadata files. Arguments: | CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’) |
reset_consumer | — reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) — delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’) | To reset or delete consumer. Arguments: | CALL sys.reset_consumer(‘default.T’, ‘myid’, 10) |
rollback_to | — rollback to a snapshot CALL sys.rollback_to(‘identifier’, snapshotId) — rollback to a tag CALL sys.rollback_to(‘identifier’, ‘tagName’) | To rollback to a specific version of target table. Argument: | CALL sys.rollback_to(‘default.T’, 10) |
expire_snapshots | — expires snapshot CALL sys.expire_snapshots(‘identifier’, retainMax) | To expire snapshots. Argument: | CALL sys.expire_snapshots(‘default.T’, 2) |