DELETE 语句
DELETE
语句可以用于根据条件来删除表中的数据
注意 目前, DELETE
语句仅支持批模式, 并且要求目标表实现了 SupportsRowLevelDelete 接口。 如果在一个没有实现该接口的表上执行 DELETE
,则会抛异常。目前 Flink 内置的连接器还没有实现该接口。
执行删除语句
Java
DELETE 语句,可以使用 TableEnvironment
中的 executeSql()
方法执行。executeSql()
方法执行 DELETE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。
以下的例子展示了如何在 TableEnvironment
中执行一条 DELETE 语句。
Scala
DELETE 语句,可以使用 TableEnvironment
中的 executeSql()
方法执行。executeSql()
方法执行 DELETE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。
以下的例子展示了如何在 TableEnvironment
中执行一条 DELETE 语句。
Python
DELETE 语句,可以使用 TableEnvironment
中的 execute_sql()
方法执行。execute_sql()
方法执行 DELETE 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。
以下的例子展示了如何在 TableEnvironment
中执行一条 DELETE 语句。
SQL CLI
可以在 SQL CLI 中执行 DELETE 语句
以下的例子展示了如何在 SQL CLI 中执行一条 DELETE 语句。
Java
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 注册一个 "Orders" 表
tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
// 插入原始数据
tEnv.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 1 |
// | Jessica | Banana | 2 |
// | Mr.White | Chicken | 3 |
// +--------------------------------+--------------------------------+-------------+
// 3 rows in set
// 根据where条件删除
tEnv.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Jessica | Banana | 2 |
// | Mr.White | Chicken | 3 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// 全表删除
tEnv.executeSql("DELETE FROM Orders").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// Empty set
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val settings = EnvironmentSettings.newInstance().inBatchMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// 注册一个 "Orders" 表
tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
// 插入原始数据
tEnv.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Lili | Apple | 1 |
// | Jessica | Banana | 2 |
// | Mr.White | Chicken | 3 |
// +--------------------------------+--------------------------------+-------------+
// 3 rows in set
// 根据where条件删除
tEnv.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// +--------------------------------+--------------------------------+-------------+
// | user | product | amount |
// +--------------------------------+--------------------------------+-------------+
// | Jessica | Banana | 2 |
// | Mr.White | Chicken | 3 |
// +--------------------------------+--------------------------------+-------------+
// 2 rows in set
// 全表删除
tEnv.executeSql("DELETE FROM Orders").await();
tEnv.executeSql("SELECT * FROM Orders").print();
// Empty set
Python
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# 注册一个 "Orders" 表
table_env.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
# 插入原始数据
table_env.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# +--------------------------------+--------------------------------+-------------+
# | user | product | amount |
# +--------------------------------+--------------------------------+-------------+
# | Lili | Apple | 1 |
# | Jessica | Banana | 2 |
# | Mr.White | Chicken | 3 |
# +--------------------------------+--------------------------------+-------------+
# 3 rows in set
# 根据where条件删除
table_env.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# +--------------------------------+--------------------------------+-------------+
# | user | product | amount |
# +--------------------------------+--------------------------------+-------------+
# | Jessica | Banana | 2 |
# | Mr.White | Chicken | 3 |
# +--------------------------------+--------------------------------+-------------+
# 2 rows in set
# 全表删除
table_env.executeSql("DELETE FROM Orders").wait();
table_env.executeSql("SELECT * FROM Orders").print();
# Empty set
SQL CLI
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL> CREATE TABLE Orders (`user` STRING, product STRING, amount INT) with (...);
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1), ('Mr.White', 'Chicken', 3);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: bd2c46a7b2769d5c559abd73ecde82e9
Flink SQL> SELECT * FROM Orders;
user product amount
Lili Apple 1
Jessica Banana 2
Mr.White Chicken 3
Flink SQL> DELETE FROM Orders WHERE `user` = 'Lili';
user product amount
Jessica Banana 2
Mr.White Chicken 3
语法
DELETE FROM [catalog_name.][db_name.]table_name [ WHERE condition ]