有了 Delete 操作为什么还要引入基于导入的批量删除?

Delete 操作的局限性

使用 Delete 语句的方式删除时,每执行一次 Delete 都会生成一个空的 rowset 来记录删除条件,并产生一个新的数据版本。每次读取都要对删除条件进行过滤,如果频繁删除或者删除条件过多时,都会严重影响查询性能。

Insert 数据和 Delete 数据穿插出现

对于类似于从事务数据库中,通过 CDC 进行数据导入的场景,数据中 Insert 和 Delete 一般是穿插出现的,面对这种场景当前 Delete 操作也是无法实现。

基于数据导入的方式,数据有三种合并方式:

  1. APPEND: 数据全部追加到现有数据中。

  2. DELETE: 删除所有与导入数据 key 列值相同的行 (当表存在sequence列时,需要同时满足主键相同以及 sequence 列的大小逻辑才能正确删除,详见下边用例 4)。

  3. MERGE: 根据 DELETE ON 的决定 APPEND 还是 DELETE。

批量删除只工作在 Unique 模型上。

基本原理

通过在 Unique 表上增加一个隐藏列DORIS_DELETE_SIGN来实现。

FE 解析查询时,遇到 * 等扩展时去掉DORIS_DELETE_SIGN,并且默认加上 DORIS_DELETE_SIGN != true 的条件,BE 读取时都会加上一列进行判断,通过条件确定是否删除。

  • 导入

    导入时在 FE 解析时将隐藏列的值设置成 DELETE ON 表达式的值。

  • 读取

    读取时在所有存在隐藏列的上增加DORIS_DELETE_SIGN != true 的条件,be 不感知这一过程,正常执行。

  • Cumulative Compaction

    Cumulative Compaction 时将隐藏列看作正常的列处理,Compaction 逻辑没有变化。

  • Base Compaction

    Base Compaction 时要将标记为删除的行的删掉,以减少数据占用的空间。

语法说明

导入的语法设计方面主要是增加一个指定删除标记列的字段的 column 映射,并且需要在导入的数据中增加一列,各种导入方式设置的语法如下

Stream Load

Stream Load 的写法在 header 中的 columns 字段增加一个设置删除标记列的字段,示例 -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"

Broker Load

Broker Load 的写法在 PROPERTIES 处设置删除标记列的字段,语法如下:

  1. LOAD LABEL db1.label1
  2. (
  3. [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
  4. INTO TABLE tbl1
  5. COLUMNS TERMINATED BY ","
  6. (tmp_c1,tmp_c2, label_c3)
  7. SET
  8. (
  9. id=tmp_c2,
  10. name=tmp_c1,
  11. )
  12. [DELETE ON label_c3=true]
  13. )
  14. WITH BROKER 'broker'
  15. (
  16. "username"="user",
  17. "password"="pass"
  18. )
  19. PROPERTIES
  20. (
  21. "timeout" = "3600"
  22. );

Routine Load

Routine Load的写法在 columns字段增加映射,映射方式同上,语法如下:

  1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  2. [WITH MERGE|APPEND|DELETE]
  3. COLUMNS(k1, k2, k3, v1, v2, label),
  4. WHERE k1 100 and k2 like "%doris%"
  5. [DELETE ON label=true]
  6. PROPERTIES
  7. (
  8. "desired_concurrent_number"="3",
  9. "max_batch_interval" = "20",
  10. "max_batch_rows" = "300000",
  11. "max_batch_size" = "209715200",
  12. "strict_mode" = "false"
  13. )
  14. FROM KAFKA
  15. (
  16. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  17. "kafka_topic" = "my_topic",
  18. "kafka_partitions" = "0,1,2,3",
  19. "kafka_offsets" = "101,0,0,200"
  20. );

注意事项

  1. 由于除Stream Load 外的导入操作在 doris 内部有可能乱序执行,因此在使用MERGE 方式导入时如果不是Stream Load,需要与 load sequence 一起使用,具体的语法可以参照sequence列 相关的文档;

  2. DELETE ON 条件只能与 MERGE 一起使用。

如果在执行导入作业前按上文所述开启了SET show_hidden_columns = true的 session variable 来查看表是否支持批量删除,按示例完成 DELETE/MERGE 的导入作业后,如果在同一个 session 中执行select count(*) from xxx等语句时,需要执行SET show_hidden_columns = false或者开启新的 session, 避免查询结果中包含那些被批量删除的记录,导致结果与预期不符。

使用示例

查看是否启用批量删除支持

  1. mysql SET show_hidden_columns=true;
  2. Query OK, 0 rows affected (0.00 sec)
  3. mysql DESC test;
  4. +-----------------------+--------------+------+-------+---------+---------+
  5. | Field | Type | Null | Key | Default | Extra |
  6. +-----------------------+--------------+------+-------+---------+---------+
  7. | name | VARCHAR(100) | No | true | NULL | |
  8. | gender | VARCHAR(10) | Yes | false | NULL | REPLACE |
  9. | age | INT | Yes | false | NULL | REPLACE |
  10. | DORIS_DELETE_SIGN | TINYINT | No | false | 0 | REPLACE |
  11. +-----------------------+--------------+------+-------+---------+---------+
  12. 4 rows in set (0.00 sec)

Stream Load 使用示例

1. 正常导入数据:

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

其中的 APPEND 条件可以省略,与下面的语句效果相同:

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

2. 将与导入数据 Key 相同的数据全部删除

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

假设导入表中原有数据为:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 3 | 2 | tom | 2 |
  5. | 4 | 3 | bush | 3 |
  6. | 5 | 3 | helen | 3 |
  7. +--------+----------+----------+------+

导入数据为:

  1. 3,2,tom,0

导入后数据变成:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 5 | 3 | helen | 3 |
  6. +--------+----------+----------+------+

3. 将导入数据中与site_id=1 的行的 Key 列相同的行

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

假设导入前数据为:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 5 | 3 | helen | 3 |
  6. | 1 | 1 | jim | 2 |
  7. +--------+----------+----------+------+

导入数据为:

  1. 2,1,grace,2
  2. 3,2,tom,2
  3. 1,1,jim,2

导入后为:

  1. +--------+----------+----------+------+
  2. | siteid | citycode | username | pv |
  3. +--------+----------+----------+------+
  4. | 4 | 3 | bush | 3 |
  5. | 2 | 1 | grace | 2 |
  6. | 3 | 2 | tom | 2 |
  7. | 5 | 3 | helen | 3 |
  8. +--------+----------+----------+------+

4. 当存在 sequence 列时,将与导入数据 Key 相同的数据全部删除

  1. curl --location-trusted -u root: -H "column_separator:," -H "columns: name, gender, age" -H "function_column.sequence_col: age" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load

当 Unique 表设置了 Sequence 列时,在相同 Key 列下,Sequence 列的值会作为 REPLACE 聚合函数替换顺序的依据,较大值可以替换较小值。当对这种表基于DORIS_DELETE_SIGN进行删除标记时,需要保证 Key 相同和 Sequence 列值要大于等于当前值。

假设有表,结构如下

  1. mysql SET show_hidden_columns=true;
  2. Query OK, 0 rows affected (0.00 sec)
  3. mysql DESC table1;
  4. +------------------------+--------------+------+-------+---------+---------+
  5. | Field | Type | Null | Key | Default | Extra |
  6. +------------------------+--------------+------+-------+---------+---------+
  7. | name | VARCHAR(100) | No | true | NULL | |
  8. | gender | VARCHAR(10) | Yes | false | NULL | REPLACE |
  9. | age | INT | Yes | false | NULL | REPLACE |
  10. | DORIS_DELETE_SIGN | TINYINT | No | false | 0 | REPLACE |
  11. | DORIS_SEQUENCE_COL | INT | Yes | false | NULL | REPLACE |
  12. +------------------------+--------------+------+-------+---------+---------+
  13. 4 rows in set (0.00 sec)

假设导入表中原有数据为:

  1. +-------+--------+------+
  2. | name | gender | age |
  3. +-------+--------+------+
  4. | li | male | 10 |
  5. | wang | male | 14 |
  6. | zhang | male | 12 |
  7. +-------+--------+------+

当导入数据为:

  1. li,male,10

导入后数据后会变成:

  1. +-------+--------+------+
  2. | name | gender | age |
  3. +-------+--------+------+
  4. | wang | male | 14 |
  5. | zhang | male | 12 |
  6. +-------+--------+------+

会发现数据

  1. li,male,10

被删除成功。

但是假如导入数据为:

  1. li,male,9

导入后数据会变成:

  1. +-------+--------+------+
  2. | name | gender | age |
  3. +-------+--------+------+
  4. | li | male | 10 |
  5. | wang | male | 14 |
  6. | zhang | male | 12 |
  7. +-------+--------+------+

会看到数据

  1. li,male,10

并没有被删除,这是因为在底层的依赖关系上,会先判断 key 相同的情况,对外展示 sequence 列的值大的行数据,然后在看该行的DORIS_DELETE_SIGN值是否为 1,如果为 1 则不会对外展示,如果为 0,则仍会读出来。

当导入数据中同时存在数据写入和删除时(例如 CDC 场景中),使用 Sequence 列可以有效的保证当数据乱序到达时的一致性,避免后到达的一个旧版本的删除操作,误删掉了先到达的新版本的数据。