INSERT 语句

INSERT 语句用来向表中添加行。

执行 INSERT 语句

Java

单条 INSERT 语句,可以使用 TableEnvironment 中的 executeSql() 方法执行。executeSql() 方法执行 INSERT 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。 多条 INSERT 语句,使用 TableEnvironment 中的 createStatementSet 创建一个 StatementSet 对象,然后使用 StatementSet 中的 addInsertSql() 方法添加多条 INSERT 语句,最后通过 StatementSet 中的 execute() 方法来执行。

以下的例子展示了如何在 TableEnvironment 中执行一条 INSERT 语句,或者通过 StatementSet 执行多条 INSERT 语句。

Scala

单条 INSERT 语句,可以使用 TableEnvironment 中的 executeSql() 方法执行。executeSql() 方法执行 INSERT 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。 多条 INSERT 语句,使用 TableEnvironment 中的 createStatementSet 创建一个 StatementSet 对象,然后使用 StatementSet 中的 addInsertSql() 方法添加多条 INSERT 语句,最后通过 StatementSet 中的 execute() 方法来执行。

以下的例子展示了如何在 TableEnvironment 中执行一条 INSERT 语句,或者通过 StatementSet 执行多条 INSERT 语句。

Python

单条 INSERT 语句,可以使用 TableEnvironment 中的 execute_sql() 方法执行。execute_sql() 方法执行 INSERT 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。 多条 INSERT 语句,使用 TableEnvironment 中的 create_statement_set 创建一个 StatementSet 对象,然后使用 StatementSet 中的 add_insert_sql() 方法添加多条 INSERT 语句,最后通过 StatementSet 中的 execute() 方法来执行。

以下的例子展示了如何在 TableEnvironment 中执行一条 INSERT 语句,或者通过 StatementSet 执行多条 INSERT 语句。

SQL CLI

可以在 SQL CLI 中执行 INSERT 语句

以下的例子展示了如何在 SQL CLI 中执行一条 INSERT 语句。

Java

  1. TableEnvironment tEnv = TableEnvironment.create(...);
  2. // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  3. tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  4. tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
  5. // 运行一条 INSERT 语句,将源表的数据输出到结果表中
  6. TableResult tableResult1 = tEnv.executeSql(
  7. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  8. // 通过 TableResult 来获取作业状态
  9. System.out.println(tableResult1.getJobClient().get().getJobStatus());
  10. //----------------------------------------------------------------------------
  11. // 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
  12. tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
  13. // 运行多条 INSERT 语句,将原表数据输出到多个结果表中
  14. StatementSet stmtSet = tEnv.createStatementSet();
  15. // `addInsertSql` 方法每次只接收单条 INSERT 语句
  16. stmtSet.addInsertSql(
  17. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  18. stmtSet.addInsertSql(
  19. "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
  20. // 执行刚刚添加的所有 INSERT 语句
  21. TableResult tableResult2 = stmtSet.execute();
  22. // 通过 TableResult 来获取作业状态
  23. System.out.println(tableResult1.getJobClient().get().getJobStatus());

Scala

  1. val tEnv = TableEnvironment.create(...)
  2. // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  3. tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  5. // 运行一个 INSERT 语句,将源表的数据输出到结果表中
  6. val tableResult1 = tEnv.executeSql(
  7. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  8. // 通过 TableResult 来获取作业状态
  9. println(tableResult1.getJobClient().get().getJobStatus())
  10. //----------------------------------------------------------------------------
  11. // 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
  12. tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
  13. // 运行多个 INSERT 语句,将原表数据输出到多个结果表中
  14. val stmtSet = tEnv.createStatementSet()
  15. // `addInsertSql` 方法每次只接收单条 INSERT 语句
  16. stmtSet.addInsertSql(
  17. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  18. stmtSet.addInsertSql(
  19. "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
  20. // 执行刚刚添加的所有 INSERT 语句
  21. val tableResult2 = stmtSet.execute()
  22. // 通过 TableResult 来获取作业状态
  23. println(tableResult1.getJobClient().get().getJobStatus())

Python

  1. table_env = TableEnvironment.create(...)
  2. # 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  3. table_env.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  4. table_env.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  5. # 运行一条 INSERT 语句,将源表的数据输出到结果表中
  6. table_result1 = table_env \
  7. .executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  8. # 通过 TableResult 来获取作业状态
  9. print(table_result1.get_job_client().get_job_status())
  10. #----------------------------------------------------------------------------
  11. # 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
  12. table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
  13. # 运行多条 INSERT 语句,将原表数据输出到多个结果表中
  14. stmt_set = table_env.create_statement_set()
  15. # `add_insert_sql` 方法每次只接收单条 INSERT 语句
  16. stmt_set \
  17. .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  18. stmt_set \
  19. .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
  20. # 执行刚刚添加的所有 INSERT 语句
  21. table_result2 = stmt_set.execute()
  22. # 通过 TableResult 来获取作业状态
  23. print(table_result2.get_job_client().get_job_status())

SQL CLI

  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. Orders
  6. RubberOrders
  7. Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
  8. [INFO] Submitting SQL update statement to the cluster...
  9. [INFO] Table update statement has been successfully submitted to the cluster:

将 SELECT 查询数据插入表中

通过 INSERT 语句,可以将查询的结果插入到表中,

语法

  1. [EXECUTE] INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
  2. part_spec:
  3. (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中或分区中的任何已存在的数据。否则,新数据会追加到表中或分区中。

PARTITION

PARTITION 语句应该包含需要插入的静态分区列与值。

COLUMN LIST

给定一个表 T(a INT, b INT, c INT),Flink 支持 INSERT INTO T(c, b) SELECT x, y FROM S。 预期行为是 “x” 被写入 “c” 列,“y” 被写入 “b” 列,而 “a” 被设置为空值(假设 “a” 列可为空)。
连接器开发人员在处理部分列更新时,如果希望避免用空值覆盖非目标列,可以从 DynamicTableSink$Context.getTargetColumns() 中获取用户插入语句指定的目标列信息,然后决定如何处理部分更新。

示例

  1. -- 创建一个分区表
  2. CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
  3. PARTITIONED BY (date, country)
  4. WITH (...)
  5. -- 追加行到该静态分区中 (date='2019-8-30', country='China')
  6. INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  7. SELECT user, cnt FROM page_view_source;
  8. -- Insert语句的开头可以额外增加EXECUTE关键字,带EXECUTE关键字和不带是等价的
  9. EXECUTE INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  10. SELECT user, cnt FROM page_view_source;
  11. -- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30'country 是动态分区,其值由每一行动态决定
  12. INSERT INTO country_page_view PARTITION (date='2019-8-30')
  13. SELECT user, cnt, country FROM page_view_source;
  14. -- 覆盖行到静态分区 (date='2019-8-30', country='China')
  15. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  16. SELECT user, cnt FROM page_view_source;
  17. -- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30'country 是动态分区,其值由每一行动态决定
  18. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  19. SELECT user, cnt, country FROM page_view_source;

将值插入表中

通过 INSERT 语句,也可以直接将值插入到表中,

语法

  1. [EXECUTE] INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
  2. values_row:
  3. : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。

示例

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. EXECUTE INSERT INTO students
  3. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);

插入数据到多张表

STATEMENT SET 可以实现通过一个语句插入数据到多个表。

语法

  1. EXECUTE STATEMENT SET
  2. BEGIN
  3. insert_statement;
  4. ...
  5. insert_statement;
  6. END;
  7. insert_statement:
  8. <insert_from_select>|<insert_from_values>

示例

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. EXECUTE STATEMENT SET
  3. BEGIN
  4. INSERT INTO students
  5. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
  6. INSERT INTO students
  7. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
  8. END;