Overview

Doris Streamloader is a client tool designed for loading data into Apache Doris. In comparison to single-threaded load using curl, it reduces the load latency of large datasets by its concurrent loading capabilities. It comes with the following features:

  • Parallel loading: multi-threaded load for the Stream Load method. You can set the parallelism level using the workers parameter.
  • Multi-file load: simultaneously load of multiple files and directories with one shot. It supports recursive file fetching and allows you to specify file names with wildcard characters.
  • Path traversal support: support path traversal when the source files are in directories
  • Resilience and continuity: in case of partial load failures, it can resume data loading from the point of failure.
  • Automatic retry mechanism: in case of loading failures, it can automatically retry a default number of times. If the loading remains unsuccessful, it will print the command for manual retry.

Installation

Version 1.0

Source Code: https://github.com/apache/doris-streamloader/

VersionDateArchitectureLink
v1.020240131x64https://apache-doris-releases.oss-accelerate.aliyuncs.com/apache-doris-streamloader-1.0.1-bin-x64.tar.xz
v1.020240131arm64https://apache-doris-releases.oss-accelerate.aliyuncs.com/apache-doris-streamloader-1.0.1-bin-arm64.tar.xz

Doris Streamloader - 图1note

The obtained result is the executable binary.

How to use

  1. doris-streamloader --source_file={FILE_LIST} --url={FE_OR_BE_SERVER_URL}:{PORT} --header={STREAMLOAD_HEADER} --db={TARGET_DATABASE} --table={TARGET_TABLE}

1. FILE_LIST support:

  • Single file

    E.g. Load a single file

  1. ```json
  2. doris-streamloader --source_file="dir" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"
  3. ```
  • Single directory

    E.g. Load a single directory

    1. doris-streamloader --source_file="dir" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"
  • File names with wildcard characters (enclosed in quotes)

    E.g. Load file0.csv, file1.csv, file2.csv

    1. doris-streamloader --source_file="file*" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"
  • A list of files separated by commas

    E.g. Load file0.csv, file1.csv, file2.csv

    1. doris-streamloader --source_file="file0.csv,file1.csv,file2.csv" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"
  • A list of directories separated by commas

    E.g. Load dir1, dir2, dir3

    1. doris-streamloader --source_file="dir1,dir2,dir3" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"

2. STREAMLOAD_HEADER supports all streamload headers separated with ‘?’ if there is more than one

Example:

  1. doris-streamloader --source_file="data.csv" --url="http://localhost:8330" --header="column_separator:|?columns:col1,col2" --db="testdb" --table="testtbl"

The parameters above are required, and the following parameters are optional:

ParameterDescriptionDefault ValueSuggestions
—uUsername of the databaseroot
—pPasswordempty string
—compressWhether to compress data upon HTTP transmissionfalseRemain as default. Compression and decompression can increase pressure on Doris Streamloader side and the CPU resources on Doris BE side, so it is advised to only enable this when network bandwidth is constrained.
—timeoutTimeout of the HTTP request sent to Doris (seconds)606010Remain as default
—batchGranularity of batch reading and sending of files (rows)4096Remain as default
—batch_byteGranularity of batch reading and sending of files (byte)943718400 (900MB)Remain as default
—workersConcurrency level of data loading0“0” means the auto mode, in which the streamload speed is based on the data size and disk throughput. You can dial this up for a high-performance cluster, but it is advised to keep it below 10. If you observe excessive memory usage (via the memtracker in log), you can dial this down.
—disk_throughputDisk throughput (MB/s)800Usually remain as default. This parameter is a basis of the automatic inference of workers. You can adjust this based on your needs to get a more appropriate value of workers.
—streamload_throughputStreamload throughput (MB/s)100Usually remain as default. The default value is derived from the streamload throughput and predicted performance provided by the daily performance testing environment. To get a more appropriate value of workers, you can configure this based on your measured streamload throughput: (LoadBytes1000)/(LoadTimeMs1024*1024)
—max_byte_per_taskMaximum data size for each load task. For a dataset exceeding this size, the remaining part will be split into a new load task.107374182400 (100G)This is recommended to be large in order to reduce the number of load versions. However, if you encounter a “body exceed max size” and try to avoid adjusting the streaming_load_max_mb parameter (which requires restarting the backend), or if you encounter a “-238 TOO MANY SEGMENT” error, you can temporarily dial this down.
—check_utf8

Whether to check the encoding of the data that has been loaded:

1) false, direct load of raw data without checking; 2) true, replacing non UTF-8 characters with �

trueRemain as default
—debugPrint debug logfalseRemain as default
—auto_retryThe list of failed workers and tasks for auto retryempty stringThis is only used when there is an load failure. The serial numbers of the failed workers and tasks will be shown and all you need is to copy and execute the the entire command. For example, if —auto_retry=”1,1;2,1”, that means the failed tasks include the first task in the first worker and the first task in the second worker.
—auto_retry_timesTimes of auto retries3Remain as default. If you don’t need retries, you can set this to 0.
—auto_retry_intervalInterval of auto retries60Remain as default. If the load failure is caused by a Doris downtime, it is recommended to set this parameter based on the restart interval of Doris.
—log_filenamePath for log storage“”Logs are printed to the console by default. To print them to a log file, you can set the path, such as —log_filename=”/var/log”.

Result description

A result will be returned no matter the data loading succeeds or fails.

ParameterDescription
StatusLoading succeeded or failed
TotalRowsTotal number of rows
FailLoadRowsNumber of rows failed to be loaded
LoadedRowsNumber of rows loaded
FilteredRowsNumber of rows filtered
UnselectedRowsNumber of rows unselected
LoadBytesNumber of bytes loaded
LoadTimeMsActual loading time
LoadFilesList of loaded files

Examples:

  • If the loading succeeds, you will see a result like:

    1. Load Result: {
    2. "Status": "Success",
    3. "TotalRows": 120,
    4. "FailLoadRows": 0,
    5. "LoadedRows": 120,
    6. "FilteredRows": 0,
    7. "UnselectedRows": 0,
    8. "LoadBytes": 40632,
    9. "LoadTimeMs": 971,
    10. "LoadFiles": [
    11. "basic.csv",
    12. "basic_data1.csv",
    13. "basic_data2.csv",
    14. "dir1/basic_data.csv",
    15. "dir1/basic_data.csv.1",
    16. "dir1/basic_data1.csv"
    17. ]
    18. }
  • If the loading fails (or partially fails), you will see a retry message:

    1. load has some error and auto retry failed, you can retry by :
    2. ./doris-streamloader --source_file /mnt/disk1/laihui/doris/tools/tpch-tools/bin/tpch-data/lineitem.tbl.1 --url="http://127.0.0.1:8239" --header="column_separator:|?columns: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,temp" --db="db" --table="lineitem1" -u root -p "" --compress=false --timeout=36000 --workers=3 --batch=4096 --batch_byte=943718400 --max_byte_per_task=1073741824 --check_utf8=true --report_duration=1 --auto_retry="2,1;1,1;0,1" --auto_retry_times=0 --auto_retry_interval=60

You can copy and execute the command. The failure message will also be provided:

  1. Load Result: {
  2. "Status": "Failed",
  3. "TotalRows": 1,
  4. "FailLoadRows": 1,
  5. "LoadedRows": 0,
  6. "FilteredRows": 0,
  7. "UnselectedRows": 0,
  8. "LoadBytes": 0,
  9. "LoadTimeMs": 104,
  10. "LoadFiles": [
  11. "/mnt/disk1/laihui/doris/tools/tpch-tools/bin/tpch-data/lineitem.tbl.1"
  12. ]
  13. }

Best practice

Parameter suggestions

  1. Required parameters:
    --source_file=FILE_LIST --url=FE_OR_BE_SERVER_URL_WITH_PORT --header=STREAMLOAD_HEADER --db=TARGET_DATABASE --table=TARGET_TABLE If you need to load multiple files, you should configure all of them at a time in source_file.

  2. The default value of workers is the number of CPU cores. When that is large, for example, 96 cores, the value of workers should be dialed down. The recommended value for most cases is 8.

  3. max_byte_per_task is recommended to be large in order to reduce the number of load versions. However, if you encounter a “body exceed max size” and try to avoid adjusting the streaming_load_max_mb parameter (which requires restarting the backend), or if you encounter a -238 TOO MANY SEGMENT error, you can temporarily dial this down. For most cases, this can remain as default.

Two parameters that impacts the number of versions:

  • workers: The more workers, the higher concurrency level, and thus the more versions. The recommended value for most cases is 8.
  • max_byte_per_task: The larger max_byte_per_task , the larger data size in one single version, and thus the less versions. However, if this is excessively high, it could easily cause an -238 TOO MANY SEGMENT error. For most cases, this can remain as default.

In most cases, you only need to set the required parameters and workers.

  1. ./doris-streamloader --source_file="demo.csv,demoFile*.csv,demoDir" --url="http://127.0.0.1:8030" --header="column_separator:," --db="demo" --table="test_load" --u="root" --workers=8

FAQ

  • Before resumable loading was available, to fix any partial failures in loading would require deleting the current table and starting over. In this case, Doris Streamloader would retry automatically. If the retry fails, a retry command will be printed so you can copy and execute it.

  • The default maximum data loading size for Doris Streamloader is limited by BE config streaming_load_max_mb (default: 100GB). If you don’t want to restart BE, you can also dial down max_byte_per_task.

    To show current streaming_load_max_mb:

    1. curl "http://127.0.0.1:8040/api/show_config"
  • If you encounter an -238 TOO MANY SEGMENT error, you can dial down max_byte_per_task.