UPDATE 语句

UPDATE 语句可以用于根据条件更新表的数据。

注意 目前, UPDATE 语句仅支持批模式, 并且要求目标表实现了 SupportsRowLevelUpdate 接口。 如果在一个没有实现该接口的表上执行 UPDATE,则会抛异常。目前 Flink 内置的连接器还没有实现该接口。

执行更新语句

Java

UPDATE 语句,可以使用 TableEnvironment 中的 executeSql() 方法执行。executeSql() 方法执行 UPDATE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。

以下的例子展示了如何在 TableEnvironment 中执行一条 UPDATE 语句。

Scala

UPDATE 语句,可以使用 TableEnvironment 中的 executeSql() 方法执行。executeSql() 方法执行 UPDATE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。

以下的例子展示了如何在 TableEnvironment 中执行一条 UPDATE 语句。

Python

UPDATE 语句,可以使用 TableEnvironment 中的 execute_sql() 方法执行。execute_sql() 方法执行 UPDATE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。

以下的例子展示了如何在 TableEnvironment 中执行一条 UPDATE 语句。

SQL CLI

可以在 SQL CLI 中执行 UPDATE 语句

以下的例子展示了如何在 SQL CLI 中执行一条 UPDATE 语句。

Java

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
  2. TableEnvironment tEnv = TableEnvironment.create(settings);
  3. // 注册一个 "Orders" 表
  4. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. // 插入原始数据
  6. tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
  7. tEnv.executeSql("SELECT * FROM Orders").print();
  8. // +--------------------------------+--------------------------------+-------------+
  9. // | user | product | amount |
  10. // +--------------------------------+--------------------------------+-------------+
  11. // | Lili | Apple | 1 |
  12. // | Jessica | Banana | 1 |
  13. // +--------------------------------+--------------------------------+-------------+
  14. // 2 rows in set
  15. // 更新所有的amount字段
  16. tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
  17. tEnv.executeSql("SELECT * FROM Orders").print();
  18. // +--------------------------------+--------------------------------+-------------+
  19. // | user | product | amount |
  20. // +--------------------------------+--------------------------------+-------------+
  21. // | Lili | Apple | 2 |
  22. // | Jessica | Banana | 2 |
  23. // +--------------------------------+--------------------------------+-------------+
  24. // 2 rows in set
  25. // 根据where条件更新
  26. tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
  27. tEnv.executeSql("SELECT * FROM Orders").print();
  28. // +--------------------------------+--------------------------------+-------------+
  29. // | user | product | amount |
  30. // +--------------------------------+--------------------------------+-------------+
  31. // | Lili | Orange | 2 |
  32. // | Jessica | Banana | 2 |
  33. // +--------------------------------+--------------------------------+-------------+
  34. // 2 rows in set

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val settings = EnvironmentSettings.newInstance().inBatchMode().build()
  3. val tEnv = StreamTableEnvironment.create(env, settings)
  4. // 注册一个 "Orders" 表
  5. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  6. // 插入原始数据
  7. tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
  8. tEnv.executeSql("SELECT * FROM Orders").print();
  9. // +--------------------------------+--------------------------------+-------------+
  10. // | user | product | amount |
  11. // +--------------------------------+--------------------------------+-------------+
  12. // | Lili | Apple | 1 |
  13. // | Jessica | Banana | 1 |
  14. // +--------------------------------+--------------------------------+-------------+
  15. // 2 rows in set
  16. // 更新所有的amount字段
  17. tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
  18. tEnv.executeSql("SELECT * FROM Orders").print();
  19. // +--------------------------------+--------------------------------+-------------+
  20. // | user | product | amount |
  21. // +--------------------------------+--------------------------------+-------------+
  22. // | Lili | Apple | 2 |
  23. // | Jessica | Banana | 2 |
  24. // +--------------------------------+--------------------------------+-------------+
  25. // 2 rows in set
  26. // 根据where条件更新
  27. tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
  28. tEnv.executeSql("SELECT * FROM Orders").print();
  29. // +--------------------------------+--------------------------------+-------------+
  30. // | user | product | amount |
  31. // +--------------------------------+--------------------------------+-------------+
  32. // | Lili | Orange | 2 |
  33. // | Jessica | Banana | 2 |
  34. // +--------------------------------+--------------------------------+-------------+
  35. // 2 rows in set

Python

  1. env_settings = EnvironmentSettings.in_batch_mode()
  2. table_env = TableEnvironment.create(env_settings)
  3. # 注册一个 "Orders" 表
  4. table_env.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. # 插入原始数据
  6. table_env.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").wait();
  7. table_env.executeSql("SELECT * FROM Orders").print();
  8. # +--------------------------------+--------------------------------+-------------+
  9. # | user | product | amount |
  10. # +--------------------------------+--------------------------------+-------------+
  11. # | Lili | Apple | 1 |
  12. # | Jessica | Banana | 1 |
  13. # +--------------------------------+--------------------------------+-------------+
  14. # 2 rows in set
  15. # 更新所有的amount字段
  16. table_env.executeSql("UPDATE Orders SET `amount` = `amount` * 2").wait();
  17. table_env.executeSql("SELECT * FROM Orders").print();
  18. # +--------------------------------+--------------------------------+-------------+
  19. # | user | product | amount |
  20. # +--------------------------------+--------------------------------+-------------+
  21. # | Lili | Apple | 2 |
  22. # | Jessica | Banana | 2 |
  23. # +--------------------------------+--------------------------------+-------------+
  24. # 2 rows in set
  25. # 根据where条件更新
  26. table_env.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").wait();
  27. table_env.executeSql("SELECT * FROM Orders").print();
  28. # +--------------------------------+--------------------------------+-------------+
  29. # | user | product | amount |
  30. # +--------------------------------+--------------------------------+-------------+
  31. # | Lili | Orange | 2 |
  32. # | Jessica | Banana | 2 |
  33. # +--------------------------------+--------------------------------+-------------+
  34. # 2 rows in set

SQL CLI

  1. Flink SQL> SET 'execution.runtime-mode' = 'batch';
  2. [INFO] Session property has been set.
  3. Flink SQL> CREATE TABLE Orders (`user` STRING, product STRING, amount INT) with (...);
  4. [INFO] Execute statement succeed.
  5. Flink SQL> INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1);
  6. [INFO] Submitting SQL update statement to the cluster...
  7. [INFO] SQL update statement has been successfully submitted to the cluster:
  8. Job ID: bd2c46a7b2769d5c559abd73ecde82e9
  9. Flink SQL> SELECT * FROM Orders;
  10. user product amount
  11. Lili Apple 1
  12. Jessica Banana 1
  13. Flink SQL> UPDATE Orders SET amount = 2;
  14. user product amount
  15. Lili Apple 2
  16. Jessica Banana 2

语法

  1. UPDATE [catalog_name.][db_name.]table_name SET column_name1 = expression1 [, column_name2 = expression2, ...][ WHERE condition ]