Broker Load

Broker load is an asynchronous import method, and the supported data sources depend on the data sources supported by the Broker process.

Because the data in the Doris table is ordered, Broker load uses the doris cluster resources to sort the data when importing data. Complete massive historical data migration relative to Spark load, the Doris cluster resource usage is relatively large. , this method is used when the user does not have Spark computing resources. If there are Spark computing resources, it is recommended to use Spark load.

Users need to create Broker load import through MySQL protocol and import by viewing command to check the import result.

Applicable scene

  • The source data is in a storage system that the broker can access, such as HDFS.
  • The amount of data is at the level of tens to hundreds of GB.

Fundamental

After the user submits the import task, FE will generate the corresponding Plan and distribute the Plan to multiple BEs for execution according to the current number of BEs and file size, and each BE executes a part of the imported data.

BE pulls data from the broker during execution, and imports the data into the system after transforming the data. All BEs are imported, and FE ultimately decides whether the import is successful.

  1. +
  2. | 1. user create broker load
  3. v
  4. +----+----+
  5. | |
  6. | FE |
  7. | |
  8. +----+----+
  9. |
  10. | 2. BE etl and load the data
  11. +--------------------------+
  12. | | |
  13. +---v---+ +--v----+ +---v---+
  14. | | | | | |
  15. | BE | | BE | | BE |
  16. | | | | | |
  17. +---+-^-+ +---+-^-+ +--+-^--+
  18. | | | | | |
  19. | | | | | | 3. pull data from broker
  20. +---v-+-+ +---v-+-+ +--v-+--+
  21. | | | | | |
  22. |Broker | |Broker | |Broker |
  23. | | | | | |
  24. +---+-^-+ +---+-^-+ +---+-^-+
  25. | | | | | |
  26. +---v-+-----------v-+----------v-+-+
  27. | HDFS/BOS/AFS cluster |
  28. | |
  29. +----------------------------------+

start import

Let’s look at Broker Load through several actual scenario examples. use

Data import of Hive partition table

  1. Create Hive table
  1. ##Data format is: default, partition field is: day
  2. CREATE TABLE `ods_demo_detail`(
  3. `id` string,
  4. `store_id` string,
  5. `company_id` string,
  6. `tower_id` string,
  7. `commodity_id` string,
  8. `commodity_name` string,
  9. `commodity_price` double,
  10. `member_price` double,
  11. `cost_price` double,
  12. `unit` string,
  13. `quantity` double,
  14. `actual_price` double
  15. )
  16. PARTITIONED BY (day string)
  17. row format delimited fields terminated by ','
  18. lines terminated by '\n'

Then use Hive’s Load command to import your data into the Hive table

  1. load data local inpath '/opt/custorm' into table ods_demo_detail;
  1. Create a Doris table, refer to the specific table syntax: CREATE TABLE
  1. CREATE TABLE `doris_ods_test_detail` (
  2. `rq` date NULL,
  3. `id` varchar(32) NOT NULL,
  4. `store_id` varchar(32) NULL,
  5. `company_id` varchar(32) NULL,
  6. `tower_id` varchar(32) NULL,
  7. `commodity_id` varchar(32) NULL,
  8. `commodity_name` varchar(500) NULL,
  9. `commodity_price` decimal(10, 2) NULL,
  10. `member_price` decimal(10, 2) NULL,
  11. `cost_price` decimal(10, 2) NULL,
  12. `unit` varchar(50) NULL,
  13. `quantity` int(11) NULL,
  14. `actual_price` decimal(10, 2) NULL
  15. ) ENGINE=OLAP
  16. UNIQUE KEY(`rq`, `id`, `store_id`)
  17. PARTITION BY RANGE(`rq`)
  18. (
  19. PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
  20. DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
  21. PROPERTIES (
  22. "replication_allocation" = "tag.location.default: 3",
  23. "dynamic_partition.enable" = "true",
  24. "dynamic_partition.time_unit" = "MONTH",
  25. "dynamic_partition.start" = "-2147483648",
  26. "dynamic_partition.end" = "2",
  27. "dynamic_partition.prefix" = "P_",
  28. "dynamic_partition.buckets" = "1",
  29. "in_memory" = "false",
  30. "storage_format" = "V2"
  31. );
  1. Start importing data

    Specific syntax reference: Broker Load

  1. LOAD LABEL broker_load_2022_03_23
  2. (
  3. DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
  4. INTO TABLE doris_ods_test_detail
  5. COLUMNS TERMINATED BY ","
  6. (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
  7. COLUMNS FROM PATH AS (`day`)
  8. SET
  9. (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
  10. )
  11. WITH BROKER "broker_name_1"
  12. (
  13. "username" = "hdfs",
  14. "password" = ""
  15. )
  16. PROPERTIES
  17. (
  18. "timeout"="1200",
  19. "max_filter_ratio"="0.1"
  20. );

Hive partition table import (ORC format)

  1. Create Hive partition table, ORC format
  1. #Data format: ORC partition: day
  2. CREATE TABLE `ods_demo_orc_detail`(
  3. `id` string,
  4. `store_id` string,
  5. `company_id` string,
  6. `tower_id` string,
  7. `commodity_id` string,
  8. `commodity_name` string,
  9. `commodity_price` double,
  10. `member_price` double,
  11. `cost_price` double,
  12. `unit` string,
  13. `quantity` double,
  14. `actual_price` double
  15. )
  16. PARTITIONED BY (day string)
  17. row format delimited fields terminated by ','
  18. lines terminated by '\n'
  19. STORED AS ORC
  1. Create a Doris table. The table creation statement here is the same as the Doris table creation statement above. Please refer to the above .

  2. Import data using Broker Load

    1. LOAD LABEL dish_2022_03_23
    2. (
    3. DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
    4. INTO TABLE doris_ods_test_detail
    5. COLUMNS TERMINATED BY ","
    6. FORMAT AS "orc"
    7. (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    8. COLUMNS FROM PATH AS (`day`)
    9. SET
    10. (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
    11. )
    12. WITH BROKER "broker_name_1"
    13. (
    14. "username" = "hdfs",
    15. "password" = ""
    16. )
    17. PROPERTIES
    18. (
    19. "timeout"="1200",
    20. "max_filter_ratio"="0.1"
    21. );

    Notice:

    • FORMAT AS "orc" : here we specify the data format to import
    • SET : Here we define the field mapping relationship between the Hive table and the Doris table and some operations for field conversion

HDFS file system data import

Let’s continue to take the Doris table created above as an example to demonstrate importing data from HDFS through Broker Load.

The statement to import the job is as follows:

  1. LOAD LABEL demo.label_20220402
  2. (
  3. DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")
  4. INTO TABLE `ods_dish_detail_test`
  5. COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
  6. )
  7. with HDFS (
  8. "fs.defaultFS"="hdfs://10.220.147.151:8020",
  9. "hadoop.username"="root"
  10. )
  11. PROPERTIES
  12. (
  13. "timeout"="1200",
  14. "max_filter_ratio"="0.1"
  15. );

The specific parameters here can refer to: Broker and [Broker Load](../../../sql-manual/sql-reference-v2 /Data-Manipulation-Statements/Load/BROKER-LOAD) documentation

View import status

We can view the status information of the above import task through the following command,

The specific syntax reference for viewing the import status SHOW LOAD

  1. mysql> show load order by createtime desc limit 1\G;
  2. **************************** 1. row ******************** ******
  3. JobId: 41326624
  4. Label: broker_load_2022_03_23
  5. State: FINISHED
  6. Progress: ETL: 100%; LOAD: 100%
  7. Type: BROKER
  8. EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
  9. TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
  10. ErrorMsg: NULL
  11. CreateTime: 2022-04-01 18:59:06
  12. EtlStartTime: 2022-04-01 18:59:11
  13. EtlFinishTime: 2022-04-01 18:59:11
  14. LoadStartTime: 2022-04-01 18:59:11
  15. LoadFinishTime: 2022-04-01 18:59:11
  16. URL: NULL
  17. JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber ":1,"FileSize":5540}
  18. 1 row in set (0.01 sec)

Cancel import

When the broker load job status is not CANCELLED or FINISHED, it can be manually canceled by the user. When canceling, you need to specify the Label of the import task to be canceled. Cancel the import command syntax to execute CANCEL LOAD view.

For example: cancel the import job with the label broker_load_2022_03_23 on the database demo

  1. CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Relevant system configuration

Broker parameters

Broker Load needs to use the Broker process to access remote storage. Different brokers need to provide different parameters. For details, please refer to Broker documentation.

FE configuration

The following configurations belong to the system-level configuration of Broker load, that is, the configurations that apply to all Broker load import tasks. The configuration values are adjusted mainly by modifying fe.conf.

  • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

    The first two configurations limit the minimum and maximum amount of data processed by a single BE. The third configuration limits the maximum number of concurrent imports for a job. The minimum amount of data processed, the maximum number of concurrency, the size of the source file and the number of BEs in the current cluster together determine the number of concurrent imports.

    1. The number of concurrent imports this time = Math.min (source file size/minimum processing capacity, maximum concurrent number, current number of BE nodes)
    2. The processing volume of a single BE imported this time = the size of the source file / the number of concurrent imports this time

    Usually the maximum amount of data supported by an import job is max_bytes_per_broker_scanner * number of BE nodes. If you need to import a larger amount of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately.

    default allocation:

    1. Parameter name: min_bytes_per_broker_scanner, the default is 64MB, the unit is bytes.
    2. Parameter name: max_broker_concurrency, default 10.
    3. Parameter name: max_bytes_per_broker_scanner, the default is 500G, the unit is bytes.

Best Practices

Application scenarios

The most suitable scenario for using Broker load is the scenario where the original data is in the file system (HDFS, BOS, AFS). Secondly, since Broker load is the only way of asynchronous import in a single import, if users need to use asynchronous access when importing large files, they can also consider using Broker load.

The amount of data

Only the case of a single BE is discussed here. If the user cluster has multiple BEs, the amount of data in the title below should be multiplied by the number of BEs. For example: if the user has 3 BEs, the value below 3G (inclusive) should be multiplied by 3, that is, below 9G (inclusive).

  • Below 3G (included)

    Users can directly submit Broker load to create import requests.

  • Above 3G

    Since the maximum processing capacity of a single import BE is 3G, the import of files exceeding 3G needs to be adjusted by adjusting the import parameters of Broker load to realize the import of large files.

    1. Modify the maximum scan amount and maximum concurrent number of a single BE according to the current number of BEs and the size of the original file.

      1. Modify the configuration in fe.conf
      2. max_broker_concurrency = number of BEs
      3. The amount of data processed by a single BE of the current import task = original file size / max_broker_concurrency
      4. max_bytes_per_broker_scanner >= the amount of data processed by a single BE of the current import task
      5. For example, for a 100G file, the number of BEs in the cluster is 10
      6. max_broker_concurrency = 10
      7. # >= 10G = 100G / 10
      8. max_bytes_per_broker_scanner = 1069547520

      After modification, all BEs will process the import task concurrently, each BE processing part of the original file.

      Note: The configurations in the above two FEs are all system configurations, that is to say, their modifications are applied to all Broker load tasks.

    2. Customize the timeout time of the current import task when creating an import

      1. The amount of data processed by a single BE of the current import task / the slowest import speed of the user Doris cluster (MB/s) >= the timeout time of the current import task >= the amount of data processed by a single BE of the current import task / 10M/s
      2. For example, for a 100G file, the number of BEs in the cluster is 10
      3. # >= 1000s = 10G / 10M/s
      4. timeout = 1000
    3. When the user finds that the timeout time calculated in the second step exceeds the default import timeout time of 4 hours

      At this time, it is not recommended for users to directly increase the maximum import timeout to solve the problem. If the single import time exceeds the default import maximum timeout time of 4 hours, it is best to divide the files to be imported and import them in multiple times to solve the problem. The main reason is: if a single import exceeds 4 hours, the time cost of retrying after the import fails is very high.

      The expected maximum import file data volume of the Doris cluster can be calculated by the following formula:

      1. Expected maximum import file data volume = 14400s * 10M/s * number of BEs
      2. For example: the number of BEs in the cluster is 10
      3. Expected maximum import file data volume = 14400s * 10M/s * 10 = 1440000M 1440G
      4. Note: The average user's environment may not reach the speed of 10M/s, so it is recommended that files over 500G be divided and imported.

Job scheduling

The system limits the number of running Broker Load jobs in a cluster to prevent too many Load jobs from running at the same time.

First, the configuration parameter of FE: desired_max_waiting_jobs will limit the number of Broker Load jobs that have not started or are running (job status is PENDING or LOADING) in a cluster. Default is 100. If this threshold is exceeded, newly submitted jobs will be rejected outright.

A Broker Load job is divided into pending task and loading task phases. Among them, the pending task is responsible for obtaining the information of the imported file, and the loading task will be sent to the BE to execute the specific import task.

The FE configuration parameter async_pending_load_task_pool_size is used to limit the number of pending tasks running at the same time. It is also equivalent to controlling the number of import tasks that are actually running. This parameter defaults to 10. That is to say, assuming that the user submits 100 Load jobs, at the same time only 10 jobs will enter the LOADING state and start execution, while other jobs are in the PENDING waiting state.

The configuration parameter async_loading_load_task_pool_size of FE is used to limit the number of tasks of loading tasks running at the same time. A Broker Load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). So async_loading_load_task_pool_size should be greater than or equal to async_pending_load_task_pool_size.

Performance Analysis

Session variables can be enabled by executing set enable_profile=true before submitting the LOAD job. Then submit the import job. After the import job is completed, you can view the profile of the import job in the Queris tab of the FE web page.

You can check the SHOW LOAD PROFILE help document for more usage help information.

This Profile can help analyze the running status of import jobs.

Currently the Profile can only be viewed after the job has been successfully executed

common problem

  • Import error: Scan bytes per broker scanner exceed limit:xxx

    Please refer to the Best Practices section in the document to modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency

  • Import error: failed to send batch or TabletWriter add batch with unknown id

    Modify query_timeout and streaming_load_rpc_max_alive_time_sec appropriately.

    streaming_load_rpc_max_alive_time_sec:

    During the import process, Doris will open a Writer for each Tablet to receive data and write. This parameter specifies the Writer’s wait timeout. If the Writer does not receive any data within this time, the Writer will be automatically destroyed. When the system processing speed is slow, the Writer may not receive the next batch of data for a long time, resulting in an import error: TabletWriter add batch with unknown id. At this time, this configuration can be appropriately increased. Default is 600 seconds

  • Import error: LOAD_RUN_FAIL; msg:Invalid Column Name:xxx

    If it is data in PARQUET or ORC format, the column name of the file header needs to be consistent with the column name in the doris table, such as:

    1. (tmp_c1,tmp_c2)
    2. SET
    3. (
    4. id=tmp_c2,
    5. name=tmp_c1
    6. )

    Represents getting the column with (tmp_c1, tmp_c2) as the column name in parquet or orc, which is mapped to the (id, name) column in the doris table. If set is not set, the column in column is used as the map.

    Note: If you use the orc file directly generated by some hive versions, the header in the orc file is not hive meta data, but (_col0, _col1, _col2, …), which may cause Invalid Column Name error, then you need to use set to map

  • Import error: Failed to get S3 FileSystem for bucket is null/empty

    1. The bucket is incorrect or does not exist.
    2. The bucket format is not supported. When creating a bucket name with _ on GCS, like s3://gs_bucket/load_tbl, the S3 Client will report an error. It is recommended not to use _ on GCS.

more help

For more detailed syntax and best practices used by Broker Load, see Broker Load command manual, you can also enter HELP BROKER LOAD in the MySql client command line for more help information.