INSERT Statement

INSERT statements are used to add rows to a table.

Run an INSERT statement

Java

Single INSERT statement can be executed through the executeSql() method of the TableEnvironment. The executeSql() method for INSERT statement will submit a Flink job immediately, and return a TableResult instance which associates the submitted job. Multiple INSERT statements can be executed through the addInsertSql() method of the StatementSet which can be created by the TableEnvironment.createStatementSet() method. The addInsertSql() method is a lazy execution, they will be executed only when StatementSet.execute() is invoked.

The following examples show how to run a single INSERT statement in TableEnvironment, run multiple INSERT statements in StatementSet.

Scala

Single INSERT statement can be executed through the executeSql() method of the TableEnvironment. The executeSql() method for INSERT statement will submit a Flink job immediately, and return a TableResult instance which associates the submitted job. Multiple INSERT statements can be executed through the addInsertSql() method of the StatementSet which can be created by the TableEnvironment.createStatementSet() method. The addInsertSql() method is a lazy execution, they will be executed only when StatementSet.execute() is invoked.

The following examples show how to run a single INSERT statement in TableEnvironment, run multiple INSERT statements in StatementSet.

Python

Single INSERT statement can be executed through the execute_sql() method of the TableEnvironment. The execute_sql() method for INSERT statement will submit a Flink job immediately, and return a TableResult instance which associates the submitted job. Multiple INSERT statements can be executed through the add_insert_sql() method of the StatementSet which can be created by the TableEnvironment.create_statement_set() method. The add_insert_sql() method is a lazy execution, they will be executed only when StatementSet.execute() is invoked.

The following examples show how to run a single INSERT statement in TableEnvironment, run multiple INSERT statements in StatementSet.

SQL CLI

Single INSERT statement can be executed in SQL CLI.

The following examples show how to run a single INSERT statement in SQL CLI.

Java

  1. TableEnvironment tEnv = TableEnvironment.create(...);
  2. // register a source table named "Orders" and a sink table named "RubberOrders"
  3. tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  4. tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
  5. // run a single INSERT query on the registered source table and emit the result to registered sink table
  6. TableResult tableResult1 = tEnv.executeSql(
  7. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  8. // get job status through TableResult
  9. System.out.println(tableResult1.getJobClient().get().getJobStatus());
  10. //----------------------------------------------------------------------------
  11. // register another sink table named "GlassOrders" for multiple INSERT queries
  12. tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
  13. // run multiple INSERT queries on the registered source table and emit the result to registered sink tables
  14. StatementSet stmtSet = tEnv.createStatementSet();
  15. // only single INSERT query can be accepted by `addInsertSql` method
  16. stmtSet.addInsertSql(
  17. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  18. stmtSet.addInsertSql(
  19. "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
  20. // execute all statements together
  21. TableResult tableResult2 = stmtSet.execute();
  22. // get job status through TableResult
  23. System.out.println(tableResult2.getJobClient().get().getJobStatus());

Scala

  1. val tEnv = TableEnvironment.create(...)
  2. // register a source table named "Orders" and a sink table named "RubberOrders"
  3. tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  5. // run a single INSERT query on the registered source table and emit the result to registered sink table
  6. val tableResult1 = tEnv.executeSql(
  7. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  8. // get job status through TableResult
  9. println(tableResult1.getJobClient().get().getJobStatus())
  10. //----------------------------------------------------------------------------
  11. // register another sink table named "GlassOrders" for multiple INSERT queries
  12. tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
  13. // run multiple INSERT queries on the registered source table and emit the result to registered sink tables
  14. val stmtSet = tEnv.createStatementSet()
  15. // only single INSERT query can be accepted by `addInsertSql` method
  16. stmtSet.addInsertSql(
  17. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  18. stmtSet.addInsertSql(
  19. "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
  20. // execute all statements together
  21. val tableResult2 = stmtSet.execute()
  22. // get job status through TableResult
  23. println(tableResult2.getJobClient().get().getJobStatus())

Python

  1. table_env = TableEnvironment.create(...)
  2. # register a source table named "Orders" and a sink table named "RubberOrders"
  3. table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  5. # run a single INSERT query on the registered source table and emit the result to registered sink table
  6. table_result1 = table_env \
  7. .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  8. # get job status through TableResult
  9. print(table_result1get_job_client().get_job_status())
  10. #----------------------------------------------------------------------------
  11. # register another sink table named "GlassOrders" for multiple INSERT queries
  12. table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
  13. # run multiple INSERT queries on the registered source table and emit the result to registered sink tables
  14. stmt_set = table_env.create_statement_set()
  15. # only single INSERT query can be accepted by `add_insert_sql` method
  16. stmt_set \
  17. .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  18. stmt_set \
  19. .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
  20. # execute all statements together
  21. table_result2 = stmt_set.execute()
  22. # get job status through TableResult
  23. print(table_result2.get_job_client().get_job_status())

SQL CLI

  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. Orders
  6. RubberOrders
  7. Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
  8. [INFO] Submitting SQL update statement to the cluster...
  9. [INFO] Table update statement has been successfully submitted to the cluster:

Insert from select queries

Query Results can be inserted into tables by using the insert clause.

Syntax

  1. [EXECUTE] INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] [column_list] select_statement
  2. part_spec:
  3. (part_col_name1=val1 [, part_col_name2=val2, ...])
  4. column_list:
  5. (col_name1 [, column_name2, ...])

OVERWRITE

INSERT OVERWRITE will overwrite any existing data in the table or partition. Otherwise, new data is appended.

PARTITION

PARTITION clause should contain static partition columns of this inserting.

COLUMN LIST

Given a table T(a INT, b INT, c INT), Flink supports INSERT INTO T(c, b) SELECT x, y FROM S. The expectation is that ‘x’ is written to column ‘c’ and ‘y’ is written to column ‘b’ and ‘a’ is set to NULL (assuming column ‘a’ is nullable).
For connector developers who want to avoid overwriting non-target columns with null values when processing partial column updates, you can get the information about the target columns specified by the user’s insert statement from DynamicTableSink$Context.getTargetColumns() and decide how to process the partial updates.

Examples

  1. -- Creates a partitioned table
  2. CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
  3. PARTITIONED BY (date, country)
  4. WITH (...)
  5. -- Appends rows into the static partition (date='2019-8-30', country='China')
  6. INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  7. SELECT user, cnt FROM page_view_source;
  8. -- Key word EXECUTE can be added at the beginning of Insert to indicate explicitly that we are going to execute the statement,
  9. -- it is equivalent to Statement without the key word.
  10. EXECUTE INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  11. SELECT user, cnt FROM page_view_source;
  12. -- Appends rows into partition (date, country), where date is static partition with value '2019-8-30',
  13. -- country is dynamic partition whose value is dynamic determined by each row.
  14. INSERT INTO country_page_view PARTITION (date='2019-8-30')
  15. SELECT user, cnt, country FROM page_view_source;
  16. -- Overwrites rows into static partition (date='2019-8-30', country='China')
  17. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  18. SELECT user, cnt FROM page_view_source;
  19. -- Overwrites rows into partition (date, country), where date is static partition with value '2019-8-30',
  20. -- country is dynamic partition whose value is dynamic determined by each row.
  21. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  22. SELECT user, cnt, country FROM page_view_source;
  23. -- Appends rows into the static partition (date='2019-8-30', country='China')
  24. -- the column cnt is set to NULL
  25. INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') (user)
  26. SELECT user FROM page_view_source;

Insert values into tables

The INSERT…VALUES statement can be used to insert data into tables directly from SQL.

Syntax

  1. [EXECUTE] INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
  2. values_row:
  3. (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE will overwrite any existing data in the table. Otherwise, new data is appended.

Examples

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. INSERT INTO students
  3. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);

Insert into multiple tables

The STATEMENT SET can be used to insert data into multiple tables in a statement.

Syntax

  1. EXECUTE STATEMENT SET
  2. BEGIN
  3. insert_statement;
  4. ...
  5. insert_statement;
  6. END;
  7. insert_statement:
  8. <insert_from_select>|<insert_from_values>

Examples

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. EXECUTE STATEMENT SET
  3. BEGIN
  4. INSERT INTO students
  5. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
  6. INSERT INTO students
  7. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
  8. END;