UPDATE Statements
UPDATE
statement is used to perform row-level updating on the target table according to the filter if provided.
Attention Currently, UPDATE
statement only supports in batch mode, and it requires the target table connector implements the SupportsRowLevelUpdate interface to support the row-level update. An exception will be thrown if trying to UPDATE
the table which has not implements the related interface. Currently, there is no existing connector maintained by flink has supported UPDATE yet.
Run a UPDATE statement
Java
UPDATE statements can be executed with the executeSql()
method of the TableEnvironment
. The executeSql()
will submit a Flink job immediately, and return a TableResult
instance which associates the submitted job.
The following examples show how to run a single UPDATE statement in TableEnvironment
.
Scala
UPDATE statements can be executed with the executeSql()
method of the TableEnvironment
. The executeSql()
will submit a Flink job immediately, and return a TableResult
instance which associates the submitted job.
The following examples show how to run a single UPDATE statement in TableEnvironment
.
Python
UPDATE statements can be executed with the execute_sql()
method of the TableEnvironment
. The executeSql()
will submit a Flink job immediately, and return a TableResult
instance which associates the submitted job.
The following examples show how to run a single UPDATE statement in TableEnvironment
.
SQL CLI
UPDATE statements can be executed in SQL CLI.
The following examples show how to run a UPDATE statement in SQL CLI.
Java
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// register a table named "Orders"
tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
// insert values
tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 1 |
// | Jessica | Banana | 1 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// update all the amount
tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 2 |
// | Jessica | Banana | 2 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// update by filter
tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Orange | 2 |
// | Jessica | Banana | 2 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// register a table named "Orders"
tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
// insert values
tEnv.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 1 |
// | Jessica | Banana | 1 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// update all the amount
tEnv.executeSql("UPDATE Orders SET `amount` = `amount` * 2").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 2 |
// | Jessica | Banana | 2 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// update by filter
tEnv.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Orange | 2 |
// | Jessica | Banana | 2 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
Python
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# register a table named "Orders"
table_env.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
# insert values
table_env.executeSql("insert into Orders values ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1)").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# +--------------------------------+--------------------------------+-------------+
# | user | product | amount |
# +--------------------------------+--------------------------------+-------------+
# | Lili | Apple | 1 |
# | Jessica | Banana | 1 |
# +--------------------------------+--------------------------------+-------------+
# 2 rows in set
# update all the amount
table_env.executeSql("UPDATE Orders SET `amount` = `amount` * 2").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# +--------------------------------+--------------------------------+-------------+
# | user | product | amount |
# +--------------------------------+--------------------------------+-------------+
# | Lili | Apple | 2 |
# | Jessica | Banana | 2 |
# +--------------------------------+--------------------------------+-------------+
# 2 rows in set
# update by filter
table_env.executeSql("UPDATE Orders SET `product` = 'Orange' WHERE `user` = 'Lili'").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# +--------------------------------+--------------------------------+-------------+
# | user | product | amount |
# +--------------------------------+--------------------------------+-------------+
# | Lili | Orange | 2 |
# | Jessica | Banana | 2 |
# +--------------------------------+--------------------------------+-------------+
# 2 rows in set
SQL CLI
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL> CREATE TABLE Orders (`user` STRING, product STRING, amount INT) with (...);
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: bd2c46a7b2769d5c559abd73ecde82e9
Flink SQL> SELECT * FROM Orders;
user product amount
Lili Apple 1
Jessica Banana 1
Flink SQL> UPDATE Orders SET amount = 2;
user product amount
Lili Apple 2
Jessica Banana 2
UPDATE ROWS
UPDATE [catalog_name.][db_name.]table_name SET column_name1 = expression1 [, column_name2 = expression2, ...][ WHERE condition ]