UPDATE Statements

UPDATE statement is used to perform row-level updating on the target table according to the filter if provided.

Attention Currently, UPDATE statement only supports in batch mode, and it requires the target table connector implements the SupportsRowLevelUpdate interface to support the row-level update. An exception will be thrown if trying to UPDATE the table which has not implements the related interface. Currently, there is no existing connector maintained by flink has supported UPDATE yet.

Run a UPDATE statement

Java

UPDATE statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single UPDATE statement in TableEnvironment.

Scala

UPDATE statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single UPDATE statement in TableEnvironment.

Python

UPDATE statements can be executed with the execute_sql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single UPDATE statement in TableEnvironment.

SQL CLI

UPDATE statements can be executed in SQL CLI.

The following examples show how to run a UPDATE statement in SQL CLI.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
  2. TableEnvironment tEnv = TableEnvironment.create(settings);
  3. // register a table named "Orders"
  4. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. // insert values
  6. tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
  7. tEnv.executeSql("SELECT * FROM Orders").print();
  8. // +--------------------------------+--------------------------------+-------------+
  9. // | user | product | amount |
  10. // +--------------------------------+--------------------------------+-------------+
  11. // | Lili | Apple | 1 |
  12. // | Jessica | Banana | 1 |
  13. // +--------------------------------+--------------------------------+-------------+
  14. // 2 rows in set
  15. // update all the amount
  16. tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
  17. tEnv.executeSql("SELECT * FROM Orders").print();
  18. // +--------------------------------+--------------------------------+-------------+
  19. // | user | product | amount |
  20. // +--------------------------------+--------------------------------+-------------+
  21. // | Lili | Apple | 2 |
  22. // | Jessica | Banana | 2 |
  23. // +--------------------------------+--------------------------------+-------------+
  24. // 2 rows in set
  25. // update by filter
  26. tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
  27. tEnv.executeSql("SELECT * FROM Orders").print();
  28. // +--------------------------------+--------------------------------+-------------+
  29. // | user | product | amount |
  30. // +--------------------------------+--------------------------------+-------------+
  31. // | Lili | Orange | 2 |
  32. // | Jessica | Banana | 2 |
  33. // +--------------------------------+--------------------------------+-------------+
  34. // 2 rows in set

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val settings = EnvironmentSettings.newInstance().inBatchMode().build()
  3. val tEnv = StreamTableEnvironment.create(env, settings)
  4. // register a table named "Orders"
  5. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  6. // insert values
  7. tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
  8. tEnv.executeSql("SELECT * FROM Orders").print();
  9. // +--------------------------------+--------------------------------+-------------+
  10. // | user | product | amount |
  11. // +--------------------------------+--------------------------------+-------------+
  12. // | Lili | Apple | 1 |
  13. // | Jessica | Banana | 1 |
  14. // +--------------------------------+--------------------------------+-------------+
  15. // 2 rows in set
  16. // update all the amount
  17. tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
  18. tEnv.executeSql("SELECT * FROM Orders").print();
  19. // +--------------------------------+--------------------------------+-------------+
  20. // | user | product | amount |
  21. // +--------------------------------+--------------------------------+-------------+
  22. // | Lili | Apple | 2 |
  23. // | Jessica | Banana | 2 |
  24. // +--------------------------------+--------------------------------+-------------+
  25. // 2 rows in set
  26. // update by filter
  27. tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
  28. tEnv.executeSql("SELECT * FROM Orders").print();
  29. // +--------------------------------+--------------------------------+-------------+
  30. // | user | product | amount |
  31. // +--------------------------------+--------------------------------+-------------+
  32. // | Lili | Orange | 2 |
  33. // | Jessica | Banana | 2 |
  34. // +--------------------------------+--------------------------------+-------------+
  35. // 2 rows in set

Python

  1. env_settings = EnvironmentSettings.in_batch_mode()
  2. table_env = TableEnvironment.create(env_settings)
  3. # register a table named "Orders"
  4. table_env.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. # insert values
  6. table_env.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").wait();
  7. table_env.executeSql("SELECT * FROM Orders").print();
  8. # +--------------------------------+--------------------------------+-------------+
  9. # | user | product | amount |
  10. # +--------------------------------+--------------------------------+-------------+
  11. # | Lili | Apple | 1 |
  12. # | Jessica | Banana | 1 |
  13. # +--------------------------------+--------------------------------+-------------+
  14. # 2 rows in set
  15. # update all the amount
  16. table_env.executeSql("UPDATE Orders SET `amount` = `amount` * 2").wait();
  17. table_env.executeSql("SELECT * FROM Orders").print();
  18. # +--------------------------------+--------------------------------+-------------+
  19. # | user | product | amount |
  20. # +--------------------------------+--------------------------------+-------------+
  21. # | Lili | Apple | 2 |
  22. # | Jessica | Banana | 2 |
  23. # +--------------------------------+--------------------------------+-------------+
  24. # 2 rows in set
  25. # update by filter
  26. table_env.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").wait();
  27. table_env.executeSql("SELECT * FROM Orders").print();
  28. # +--------------------------------+--------------------------------+-------------+
  29. # | user | product | amount |
  30. # +--------------------------------+--------------------------------+-------------+
  31. # | Lili | Orange | 2 |
  32. # | Jessica | Banana | 2 |
  33. # +--------------------------------+--------------------------------+-------------+
  34. # 2 rows in set

SQL CLI

  1. Flink SQL> SET 'execution.runtime-mode' = 'batch';
  2. [INFO] Session property has been set.
  3. Flink SQL> CREATE TABLE Orders (`user` STRING, product STRING, amount INT) with (...);
  4. [INFO] Execute statement succeed.
  5. Flink SQL> INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1);
  6. [INFO] Submitting SQL update statement to the cluster...
  7. [INFO] SQL update statement has been successfully submitted to the cluster:
  8. Job ID: bd2c46a7b2769d5c559abd73ecde82e9
  9. Flink SQL> SELECT * FROM Orders;
  10. user product amount
  11. Lili Apple 1
  12. Jessica Banana 1
  13. Flink SQL> UPDATE Orders SET amount = 2;
  14. user product amount
  15. Lili Apple 2
  16. Jessica Banana 2

UPDATE ROWS

  1. UPDATE [catalog_name.][db_name.]table_name SET column_name1 = expression1 [, column_name2 = expression2, ...][ WHERE condition ]