Usage Scenarios

During the import process, Doris supports some transformations on the source data, including mapping, conversion, preceding filtering, and post-filtering.

  • Mapping: Import column A from the source data into column B in the target table.

  • Conversion: Calculate the values in the target column based on the columns in the source data using an expression. Custom functions are supported in the expression.

  • Preceding Filtering: Filter rows in the source data and only import rows that meet the filtering conditions.

  • Post-Filtering: Filter rows in the result and only import rows that meet the filtering conditions.

Quick Start

BROKER LOAD

  1. LOAD LABEL example_db.label1
  2. (
  3. DATA INFILE("bos://bucket/input/file")
  4. INTO TABLE `my_table`
  5. (k1, k2, tmpk3)
  6. PRECEDING FILTER k1 = 1
  7. SET (
  8. k3 = tmpk3 + 1
  9. )
  10. WHERE k1 > k2
  11. )
  12. WITH BROKER bos
  13. (
  14. ...
  15. );

STREAM LOAD

  1. curl
  2. --location-trusted
  3. -u user:passwd
  4. -H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
  5. -H "where: k1 > k2"
  6. -T file.txt
  7. http://host:port/api/testDb/testTbl/_stream_load

ROUTINE LOAD

  1. CREATE ROUTINE LOAD example_db.label1 ON my_table
  2. COLUMNS(k1, k2, tmpk3, k3 = tmpk3 + 1),
  3. PRECEDING FILTER k1 = 1,
  4. WHERE k1 > k2
  5. ...

Reference Manual

Loading Syntax

Stream Load

Add columns and where parameters in the HTTP header.

  • columns specify column mapping and value transformation.

  • where specify post-filtering.

Stream load does not support preceding filtering.

Example:

  1. curl
  2. --location-trusted
  3. -u user:passwd
  4. -H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
  5. -H "where: k1 > k2"
  6. -T file.txt
  7. http://host:port/api/testDb/testTbl/_stream_load

Broker Load

Define data transformation in the SQL statement, including:

  • (k1, k2, tmpk3) specifies column mapping.

  • PRECEDING FILTER specifies preceding filtering.

  • SET specifies column transformation.

  • WHERE specifies post-filtering.

  1. LOAD LABEL example_db.label1
  2. (
  3. DATA INFILE("bos://bucket/input/file")
  4. INTO TABLE `my_table`
  5. (k1, k2, tmpk3)
  6. PRECEDING FILTER k1 = 1
  7. SET (
  8. k3 = tmpk3 + 1
  9. )
  10. WHERE k1 > k2
  11. )
  12. WITH BROKER bos
  13. (
  14. ...
  15. );

Routine Load

Define data transformation in the SQL statement, including:

  • COLUMNS specifies column mapping and column transformation.

  • PRECEDING FILTER specifies preceding filtering.

  • WHERE specifies post-filtering.

  1. CREATE ROUTINE LOAD example_db.label1 ON my_table
  2. COLUMNS(k1, k2, tmpk3, k3 = tmpk3 + 1),
  3. PRECEDING FILTER k1 = 1,
  4. WHERE k1 > k2
  5. ...

Insert Into

Insert Into can perform data transformation directly in the SELECT statement, and add a WHERE clause for data filtering.

Column Mapping

The purpose of column mapping is to describe the information of each column in the load file, which is equivalent to defining names for the columns in the source data. By describing the column mapping relationship, we can load source files with different column orders and column numbers into Doris. Let’s illustrate it through examples:

Suppose the source file has 4 columns with the following contents (the column names in the table header are for illustration purposes only and are not actually present in the file):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
4\Nchongqing1.4

Converting Source Data - 图1note

Note: \N represents null in the source file.

  1. Adjusting Mapping Order

  2. Suppose there are 4 columns in the table: k1, k2, k3, k4. The desired load mapping is as follows:

  1. Column 1 -> k1
  2. Column 2 -> k3
  3. Column 3 -> k2
  4. Column 4 -> k4
  1. The order of column mapping should be as follows:
  1. (k1, k3, k2, k4)
  1. The number of columns in the source file is greater than the number of columns in the table.

  2. Suppose there are 3 columns in the table: k1, k2, k3. The desired load mapping is as follows:

  1. Column 1 -> k1
  2. Column 2 -> k3
  3. Column 3 -> k2
  1. The order of column mapping should be as follows:
  1. (k1, k3, k2, tmpk4)
  1. Here, tmpk4 is a custom column name that doesn’t exist in the table. Doris will ignore this non-existing column name.

  2. The number of columns in the source file is less than the number of columns in the table, and default values will be used to fill the missing columns.

  3. Suppose there are 5 columns in the table: k1, k2, k3, k4, k5. The desired load mapping is as follows:

  1. Column 1 -> k1
  2. Column 2 -> k3
  3. Column 3 -> k2
  1. Here, only the first 3 columns from the source file will be used. The columns k4 and k5 are expected to be filled with default values.

  2. The order of column mapping should be as follows:

  1. (k1, k3, k2)
  1. If the columns k4 and k5 have default values, they will be filled accordingly. Otherwise, if the columns are nullable, they will be filled with null values. Otherwise, the loading job will report an error.

Pre-filtering

Pre-filtering is a process of filtering the raw data that is read. Currently, it is only supported in BROKER LOAD and ROUTINE LOAD.

Pre-filtering can be applied in the following scenarios:

  1. Filtering before transformation: It allows filtering of data before performing column mapping and transformation. This way, unnecessary data can be filtered out in advance.

  2. Filtering columns that do not exist in the table: It can be used as a filtering identifier when certain columns are not present in the table.

  3. Handling data from multiple tables: For example, if the source data contains data from multiple tables (or data from multiple tables is written to the same Kafka message queue), each row may include a column name that identifies which table the data belongs to. Users can use pre-filtering conditions to select and load the corresponding table data.

Column Transformation

Column transformation enables users to modify the values of columns in the source files. Currently, Doris supports the use of built-in functions and user-defined functions for transformation.

Converting Source Data - 图2note

Note: User-defined functions belong to a specific database, and when using custom functions for transformation, users need to have read permissions on that database.

Transformation operations are typically defined in conjunction with column mapping. In the following example, we illustrate the process:

Assume that the source file has 4 columns with the following content (the column names in the header are for descriptive purposes only and are not actually present in the file):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
\N400chongqing1.4
  1. Load the transformed column values into the table from the source file.

  2. Assuming the table has 4 columns: k1, k2, k3, k4, and we want the following mapping and transformation relationships:

  1. Column 1 -> k1
  2. Column 2 * 100 -> k3
  3. Column 3 -> k2
  4. Column 4 -> k4
  1. The order of column mapping should be as follows:
  1. (k1, tmpk3, k2, k4, k3 = tmpk3 * 100)
  1. Here, we rename the second column in the source file as tmpk3 and specify that the value of column k3 in the table is tmpk3 * 100. The final data in the table would be as follows:
k1k2k3k4
1beijing100001.1
2shanghai200001.2
3guangzhou300001.3
nullchongqing400001.4
  1. Perform conditional column transformation using the case when function.

  2. Assuming the table has 4 columns: k1, k2, k3, k4, and we want to transform the values beijing, shanghai, guangzhou, chongqing in the source data to their corresponding region IDs before loading:

  1. Column 1 -> k1
  2. Column 2 -> k2
  3. Column 3 with region ID -> k3
  4. Column 4 -> k4
  1. The order of column mapping should be as follows:
  1. (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
  1. The final data in the table would be as follows:
k1k2k3k4
110011.1
220021.2
330031.3
null40041.4
  1. Transform null values in the source file to 0 during load. Also, perform the region ID transformation as shown in example 2.

  2. Assuming the table has k1, k2, k3, k4 as its four columns. While performing the region ID conversion, we also want to convert null values in the k1 column of the source data to 0 during load:

  1. If Column 1 is null, then convert it to 0 -> k1
  2. Column 2 -> k2
  3. Column 3 -> k3
  4. Column 4 -> k4
  1. The order of column mapping should be as follows:
  1. (tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
  1. The final data in the table would be as follows:
k1k2k3k4
110011.1
220021.2
330031.3
040041.4

Post-Filtering

After column mapping and transformation, we can filter out data that we don’t want to load into Doris using filtering conditions. Let’s illustrate this with an example:

Assume that the source file has 4 columns with the following content (the column names in the table header are for descriptive purposes only and are not actually present):

Column 1Column 2Column 3Column 4
1100beijing1.1
2200shanghai1.2
3300guangzhou1.3
null400chongqing1.4
  1. Filtering with default column mapping and transformation.

  2. Suppose the table has 4 columns: k1, k2, k3, k4. We can define filtering conditions directly without column mapping and transformation. For example, if we only want to load data rows from the source file where the value in the 4th column is greater than 1.2, the filtering condition would be:

  1. where k4 > 1.2
  1. The final data in the table would be as follows:
k1k2k3k4
3300guangzhou1.3
null400chongqing1.4
  1. In the default case, Doris performs column mapping in sequential order, so the 4th column in the source file is automatically mapped to the k4 column in the table.

  2. Filtering transformed data.

  3. Suppose the table has 4 columns: k1, k2, k3, k4. In the column transformation example, we converted province names to IDs. Now, let’s say we want to filter out data with an ID of 3. The transformation and filtering conditions would be as follows:

  1. (k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
  2. where k3 != 3
  1. The final data in the table would be as follows:
k1k2k3k4
110011.1
220021.2
null40041.4
  1. Here, we can observe that the column values used for filtering are the final transformed column values, not the original data.

  2. Filtering with multiple conditions.

  3. Suppose the table has 4 columns: k1, k2, k3, k4. We want to filter out data where the k1 column is null and the k4 column is less than 1.2. The filtering condition would be:

  1. where k1 is not null and k4 >= 1.2
  1. The final data in the table would be as follows:
k1k2k3k4
220021.2
330031.3

Best Practices

Data Quality Issues and Filtering Threshold

The rows of data processed in the load job can be classified into the following three categories:

  • Filtered Rows: Data rows that are filtered out due to data quality issues. Data quality issues can include type errors, precision errors, strings exceeding length limits, mismatched file column counts, and data rows filtered out due to missing corresponding partitions.

  • Unselected Rows: These are data rows filtered out due to preceding filter or where column filtering conditions.

  • Loaded Rows: Data rows that are successfully loaded.

Doris’s load task allows users to set a maximum error rate (max_filter_ratio). If the error rate of the loaded data is below the threshold, the error rows will be ignored, and the other correct data will be loaded.

The error rate is calculated as follows:

  1. #Filtered Rows / (#Filtered Rows + #Loaded Rows)

This means that Unselected Rows are not included in the error rate calculation.