使用场景

Doris 中所有导入任务都是原子性的,即一个导入作业要么全部成功,要么全部失败,不会出现仅部分数据导入成功的情况,并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。对于简单的导入任务,用户无需做额外配置或操作。对于表所附属的物化视图,也同时保证和基表的原子性和一致性。对于以下情形,Doris 为用户提供了更多的事务控制。

  1. 如果用户需要将对于同一个目标表的多个 INSERT INTO 导入组合成一个事务,可以使用 BEGINCOMMIT 命令。

  2. 如果用户需要将多个 Stream Load 导入组合成一个事务,可以使用 Stream Load 的两阶段事务提交模式。

  3. Broker Load 多表导入的原子性,

基本原理

Doris 导入任务中,BE 会提交写入成功的 Tablet ID 到 FE。FE 会根据 tablet 成功副本数判断导入是否成功,如果成功,该导入的事务被 commit,导入数据可见。如果失败,该导入的事务会被 rollback,相应的 tablet 也会被清理。

Label 机制

Doris 的导入作业都可以设置一个 Label。这个 Label 通常是用户自定义的、具有一定业务逻辑属性的字符串。

Label 的主要作用是唯一标识一个导入任务,并且能够保证相同的 Label 仅会被成功导入一次。

Label 机制可以保证导入数据的不丢不重。如果上游数据源能够保证 At-Least-Once 语义,则配合 Doris 的 Label 机制,能够保证 Exactly-Once 语义。

Label 在一个数据库下具有唯一性。Label 的保留期限默认是 3 天。即 3 天后,已完成的 Label 会被自动清理,之后 Label 可以被重复使用。

快速上手

Insert Into

1. 建表

  1. CREATE TABLE testdb.test_table(
  2. user_id BIGINT NOT NULL COMMENT "用户 ID",
  3. name VARCHAR(20) NOT NULL COMMENT "用户姓名",
  4. age INT COMMENT "用户年龄"
  5. )
  6. DUPLICATE KEY(user_id)
  7. DISTRIBUTED BY HASH(user_id) BUCKETS 10;

创建一个同样 Schema 的表用于失败的例子

  1. CREATE TABLE testdb.test_table2 LIKE testdb.test_table;

2. 导入成功的例子

  1. BEGIN;
  2. -- INSERT #1
  3. INSERT INTO testdb.test_table (user_id, name, age)
  4. VALUES (1, "Emily", 25),
  5. (2, "Benjamin", 35),
  6. (3, "Olivia", 28),
  7. (4, "Alexander", 60),
  8. (5, "Ava", 17);
  9. -- INSERT #2
  10. INSERT INTO testdb.test_table (user_id, name, age)
  11. VALUES (6, "William", 69),
  12. (7, "Sophia", 32),
  13. (8, "James", 64),
  14. (9, "Emma", 37),
  15. (10, "Liam", 64);
  16. COMMIT;

导入结果,导入任务的状态先是 PREPARE,直到 COMMIT 后才是 VISIBLE

  1. // BEGIN
  2. Query OK, 0 rows affected (0.001 sec)
  3. {'label':'txn_insert_2aeac5519bd549a1-a72fe4001c56e10c', 'status':'PREPARE', 'txnId':''}
  4. // INSERT #1
  5. Query OK, 5 rows affected (0.017 sec)
  6. {'label':'txn_insert_2aeac5519bd549a1-a72fe4001c56e10c', 'status':'PREPARE', 'txnId':'10060'}
  7. // INSERT #2
  8. Query OK, 5 rows affected (0.007 sec)
  9. {'label':'txn_insert_2aeac5519bd549a1-a72fe4001c56e10c', 'status':'PREPARE', 'txnId':'10060'}
  10. // COMMIT
  11. Query OK, 0 rows affected (1.013 sec)
  12. {'label':'txn_insert_2aeac5519bd549a1-a72fe4001c56e10c', 'status':'VISIBLE', 'txnId':'10060'}

验证数据

  1. MySQL [testdb]> SELECT * FROM testdb.test_table;
  2. +---------+-----------+------+
  3. | user_id | name | age |
  4. +---------+-----------+------+
  5. | 5 | Ava | 17 |
  6. | 10 | Liam | 64 |
  7. | 1 | Emily | 25 |
  8. | 4 | Alexander | 60 |
  9. | 7 | Sophia | 32 |
  10. | 9 | Emma | 37 |
  11. | 2 | Benjamin | 35 |
  12. | 3 | Olivia | 28 |
  13. | 6 | William | 69 |
  14. | 8 | James | 64 |
  15. +---------+-----------+------+
  16. 10 rows in set (0.110 sec)

3. 导入失败的例子

  1. BEGIN;
  2. -- INSERT #1
  3. INSERT INTO testdb.test_table2 (user_id, name, age)
  4. VALUES (1, "Emily", 25),
  5. (2, "Benjamin", 35),
  6. (3, "Olivia", 28),
  7. (4, "Alexander", 60),
  8. (5, "Ava", 17);
  9. -- INSERT #2
  10. INSERT INTO testdb.test_table2 (user_id, name, age)
  11. VALUES (6, "William", 69),
  12. (7, "Sophia", 32),
  13. (8, NULL, 64),
  14. (9, "Emma", 37),
  15. (10, "Liam", 64);
  16. COMMIT;

导入结果,因为第二个 INSERT INTO 存在 NULL,导致整个事务 COMMIT 失败。

  1. // BEGIN
  2. Query OK, 0 rows affected (0.001 sec)
  3. {'label':'txn_insert_f3ecb2285edf42e2-92988ee97d74fbb0', 'status':'PREPARE', 'txnId':''}
  4. // INSERT #1
  5. Query OK, 5 rows affected (0.012 sec)
  6. {'label':'txn_insert_f3ecb2285edf42e2-92988ee97d74fbb0', 'status':'PREPARE', 'txnId':'10062'}
  7. // INSERT #2
  8. {'label':'txn_insert_f3ecb2285edf42e2-92988ee97d74fbb0', 'status':'PREPARE', 'txnId':'10062'}
  9. // COMMIT
  10. ERROR 1105 (HY000): errCode = 2, detailMessage = errCode = 2, detailMessage = [DATA_QUALITY_ERROR]too many filtered rows

验证结果,没有数据被导入。

  1. MySQL [testdb]> SELECT * FROM testdb.test_table2;
  2. Empty set (0.019 sec)

Stream Load

1. 在 HTTP Header 中设置 two_phase_commit:true 启用两阶段提交。

  1. curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
  2. {
  3. "TxnId": 18036,
  4. "Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
  5. "TwoPhaseCommit": "true",
  6. "Status": "Success",
  7. "Message": "OK",
  8. "NumberTotalRows": 100,
  9. "NumberLoadedRows": 100,
  10. "NumberFilteredRows": 0,
  11. "NumberUnselectedRows": 0,
  12. "LoadBytes": 1031,
  13. "LoadTimeMs": 77,
  14. "BeginTxnTimeMs": 1,
  15. "StreamLoadPutTimeMs": 1,
  16. "ReadDataTimeMs": 0,
  17. "WriteDataTimeMs": 58,
  18. "CommitAndPublishTimeMs": 0
  19. }

2. 对事务触发 commit 操作(请求发往 FE 或 BE 均可)

  • 可以使用事务 id 指定事务

    1. curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/stream_load2pc
    2. {
    3. "status": "Success",
    4. "msg": "transaction [18036] commit successfully."
    5. }
  • 也可以使用 label 指定事务

    1. curl -X PUT --location-trusted -u user:passwd -H "label:55c8ffc9-1c40-4d51-b75e-f2265b3602ef" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc
    2. {
    3. "status": "Success",
    4. "msg": "label [55c8ffc9-1c40-4d51-b75e-f2265b3602ef] commit successfully."
    5. }

3. 对事务触发 abort 操作(请求发往 FE 或 BE 均可)

  • 可以使用事务 id 指定事务

    1. curl -X PUT --location-trusted -u user:passwd -H "txn_id:18037" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/{table}/stream_load2pc
    2. {
    3. "status": "Success",
    4. "msg": "transaction [18037] abort successfully."
    5. }
  • 也可以使用 label 指定事务

    1. curl -X PUT --location-trusted -u user:passwd -H "label:55c8ffc9-1c40-4d51-b75e-f2265b3602ef" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/{table}/stream_load2pc
    2. {
    3. "status": "Success",
    4. "msg": "label [55c8ffc9-1c40-4d51-b75e-f2265b3602ef] abort successfully."
    5. }

Broker Load

所有 Broker Load 导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。还可以通过 Label 的机制来保证数据导入的不丢不重。

下面例子是从 HDFS 导入数据,使用通配符匹配两批文件,分别导入到两个表中。

  1. LOAD LABEL example_db.label2
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
  4. INTO TABLE `my_table1`
  5. PARTITION (p1)
  6. COLUMNS TERMINATED BY ","
  7. (k1, tmp_k2, tmp_k3)
  8. SET (
  9. k2 = tmp_k2 + 1,
  10. k3 = tmp_k3 + 1
  11. )
  12. DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
  13. INTO TABLE `my_table2`
  14. COLUMNS TERMINATED BY ","
  15. (k1, k2, k3)
  16. )
  17. WITH BROKER hdfs
  18. (
  19. "username"="hdfs_user",
  20. "password"="hdfs_password"
  21. );

使用通配符匹配导入两批文件 file-10*file-20*。分别导入到 my_table1my_table2 两张表中。其中 my_table1 指定导入到分区 p1 中,并且将导入源文件中第二列和第三列的值 +1 后导入。

最佳实践

Label 通常被设置为 业务逻辑+时间 的格式。如 my_business1_20220330_125000

这个 Label 通常用于表示:业务 my_business1 这个业务在 2022-03-30 12:50:00 产生的一批数据。通过这种 Label 设定,业务上可以通过 Label 查询导入任务状态,来明确的获知该时间点批次的数据是否已经导入成功。如果没有成功,则可以使用这个 Label 继续重试导入。

INSERT INTO 支持将 Doris 查询的结果导入到另一个表中。INSERT INTO 是一个同步导入方式,执行导入后返回导入结果。可以通过请求的返回判断导入是否成功。INSERT INTO 可以保证导入任务的原子性,要么全部导入成功,要么全部导入失败。