INSERT 语句

INSERT 语句用来向表中添加行。

执行 INSERT 语句

可以使用 TableEnvironment 中的 sqlUpdate() 方法执行 INSERT 语句,也可以在 SQL CLI 中执行 INSERT 语句。sqlUpdate() 方法执行 INSERT 语句时时懒执行的,只有当TableEnvironment.execute(jobName)被调用时才会被执行。

以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中执行一个 INSERT 语句。

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance()...
  2. TableEnvironment tEnv = TableEnvironment.create(settings);
  3. // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  4. tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  5. tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
  6. // 运行一个 INSERT 语句,将源表的数据输出到结果表中
  7. tEnv.sqlUpdate(
  8. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  1. val settings = EnvironmentSettings.newInstance()...
  2. val tEnv = TableEnvironment.create(settings)
  3. // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  4. tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  5. tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  6. // 运行一个 INSERT 语句,将源表的数据输出到结果表中
  7. tEnv.sqlUpdate(
  8. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. settings = EnvironmentSettings.newInstance()...
  2. table_env = TableEnvironment.create(settings)
  3. # 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
  4. table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  5. table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  6. # 运行一个 INSERT 语句,将源表的数据输出到结果表中
  7. table_env \
  8. .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. Orders
  6. RubberOrders
  7. Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
  8. [INFO] Submitting SQL update statement to the cluster...
  9. [INFO] Table update statement has been successfully submitted to the cluster:

将 SELECT 查询数据插入表中

通过 INSERT 语句,可以将查询的结果插入到表中,

语法

  1. INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
  2. part_spec:
  3. (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中或分区中的任何已存在的数据。否则,新数据会追加到表中或分区中。

PARTITION

PARTITION 语句应该包含需要插入的静态分区列与值。

示例

  1. -- 创建一个分区表
  2. CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
  3. PARTITIONED BY (date, country)
  4. WITH (...)
  5. -- 追加行到该静态分区中 (date='2019-8-30', country='China')
  6. INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  7. SELECT user, cnt FROM page_view_source;
  8. -- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30'country 是动态分区,其值由每一行动态决定
  9. INSERT INTO country_page_view PARTITION (date='2019-8-30')
  10. SELECT user, cnt, country FROM page_view_source;
  11. -- 覆盖行到静态分区 (date='2019-8-30', country='China')
  12. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  13. SELECT user, cnt FROM page_view_source;
  14. -- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30'country 是动态分区,其值由每一行动态决定
  15. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  16. SELECT user, cnt, country FROM page_view_source;

将值插入表中

通过 INSERT 语句,也可以直接将值插入到表中,

语法

  1. INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
  2. values_row:
  3. : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE 将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。

示例

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. INSERT INTO students
  3. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);