SQL Write

Syntax

  1. INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

For more information, please check the syntax document:

Flink INSERT Statement

INSERT INTO

Use INSERT INTO to apply records and changes to tables.

  1. INSERT INTO my_table SELECT ...

INSERT INTO supports both batch and streaming mode. In Streaming mode, by default, it will also perform compaction, snapshot expiration, and even partition expiration in Flink Sink (if it is configured).

For multiple jobs to write the same table, you can refer to dedicated compaction job for more info.

Overwriting the Whole Table

For unpartitioned tables, Paimon supports overwriting the whole table. (or for partitioned table which disables dynamic-partition-overwrite option).

Use INSERT OVERWRITE to overwrite the whole unpartitioned table.

  1. INSERT OVERWRITE my_table SELECT ...

Overwriting a Partition

For partitioned tables, Paimon supports overwriting a partition.

Use INSERT OVERWRITE to overwrite a partition.

  1. INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Dynamic Overwrite

Flink’s default overwrite mode is dynamic partition overwrite (that means Paimon only deletes the partitions appear in the overwritten data). You can configure dynamic-partition-overwrite to change it to static overwritten.

  1. -- MyTable is a Partitioned Table
  2. -- Dynamic overwrite
  3. INSERT OVERWRITE my_table SELECT ...
  4. -- Static overwrite (Overwrite whole table)
  5. INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...

Truncate tables

Flink 1.17-

You can use INSERT OVERWRITE to purge tables by inserting empty value.

  1. INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM my_table WHERE false;

Flink 1.18+

  1. TRUNCATE TABLE my_table;

Purging Partitions

Currently, Paimon supports two ways to purge partitions.

  1. Like purging tables, you can use INSERT OVERWRITE to purge data of partitions by inserting empty value to them.

  2. Method #1 does not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop_partition job through flink run.

  1. -- Syntax
  2. INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
  3. PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM my_table WHERE false;
  4. -- The following SQL is an example:
  5. -- table definition
  6. CREATE TABLE my_table (
  7. k0 INT,
  8. k1 INT,
  9. v STRING
  10. ) PARTITIONED BY (k0, k1);
  11. -- you can use
  12. INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
  13. PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false;
  14. -- or
  15. INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */
  16. PARTITION (k0 = 0, k1 = 0) SELECT v FROM my_table WHERE false;

Updating tables

Important table properties setting:

  1. Only primary key table supports this feature.
  2. MergeEngine needs to be deduplicate or partial-update to support this feature.
  3. Do not support updating primary keys.

Currently, Paimon supports updating records by using UPDATE in Flink 1.17 and later versions. You can perform UPDATE in Flink’s batch mode.

  1. -- Syntax
  2. UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
  3. -- The following SQL is an example:
  4. -- table definition
  5. CREATE TABLE my_table (
  6. a STRING,
  7. b INT,
  8. c INT,
  9. PRIMARY KEY (a) NOT ENFORCED
  10. ) WITH (
  11. 'merge-engine' = 'deduplicate'
  12. );
  13. -- you can use
  14. UPDATE my_table SET b = 1, c = 2 WHERE a = 'myTable';

Deleting from table

Flink 1.17+

Important table properties setting:

  1. Only primary key tables support this feature.
  2. If the table has primary keys, MergeEngine needs to be deduplicate to support this feature.
  3. Do not support deleting from table in streaming mode.
  1. -- Syntax
  2. DELETE FROM table_identifier WHERE conditions;
  3. -- The following SQL is an example:
  4. -- table definition
  5. CREATE TABLE my_table (
  6. id BIGINT NOT NULL,
  7. currency STRING,
  8. rate BIGINT,
  9. dt String,
  10. PRIMARY KEY (id, dt) NOT ENFORCED
  11. ) PARTITIONED BY (dt) WITH (
  12. 'merge-engine' = 'deduplicate'
  13. );
  14. -- you can use
  15. DELETE FROM my_table WHERE currency = 'UNKNOWN';