ROUTINE LOAD
description
Routine Load function allows users to submit a resident load task, and continuously load data into Doris by continuously reading data from the specified data source. Currently, only text data format (CSV) data is loaded from Kakfa by means of no authentication or SSL authentication.
Syntax:
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
[db.]job_name
The name of the load job, in the same database, only one job can run with the same name.
tbl_name
Specifies the name of the table that needs to be loaded.
load_properties
Used to describe the load data. grammar:
[column_separator],
[columns_mapping],
[where_predicates],
[partitions]
column_separator:
Specify column separators, such as:
COLUMNS TERMINATED BY ","
The default is:
\t
columns_mapping:
Specifies the mapping of columns in the source data and defines how the derived columns are generated.
Map column:
Specify in order, which columns in the source data correspond to which columns in the destination table. For columns that you want to skip, you can specify a column name that does not exist.
Suppose the destination table has three columns k1, k2, v1. The source data has 4 columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:
COLUMNS (k2, k1, xxx, v1)
Where xxx is a column that does not exist and is used to skip the third column in the source data.
Derived columns:
A column represented in the form of col_name = expr, which we call a derived column. That is, the value of the corresponding column in the destination table is calculated by expr.
Derived columns are usually arranged after the mapped column. Although this is not mandatory, Doris always parses the mapped columns first and then parses the derived columns.
Following an example, assume that the destination table also has column 4, v2, which is generated by the sum of k1 and k2. You can write as follows:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
where_predicates
Used to specify filter criteria to filter out unwanted columns. Filter columns can be either mapped columns or derived columns.
For example, if we only want to load a column with k1 greater than 100 and k2 equal to 1000, we would write as follows:
WHERE k1 > 100 and k2 = 1000
partitions
Specifies which partitions of the load destination table. If not specified, it will be automatically loaded into the corresponding partition.
Example:
PARTITION(p1, p2, p3)
job_properties
A generic parameter that specifies a routine load job.
syntax:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
Currently we support the following parameters:
desired_concurrent_number
The degree of concurrency desired. A routine load job is split into multiple subtasks. This parameter specifies how many tasks can be executed simultaneously in a job. Must be greater than 0. The default is 3.
This concurrency is not the actual concurrency. The actual concurrency will be considered by the number of nodes in the cluster, the load, and the data source.
example:
"desired_concurrent_number" = "3"
max_batch_interval/max_batch_rows/max_batch_size
These three parameters represent:
The maximum execution time of each subtask, in seconds. The range is 5 to 60. The default is 10.
The maximum number of rows read per subtask. Must be greater than or equal to 200,000. The default is 200000.
The maximum number of bytes read per subtask. The unit is byte and the range is 100MB to 1GB. The default is 100MB.
These three parameters are used to control the execution time and throughput of a subtask. When either one reaches the threshold, the task ends.
example:
```
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
```
3. `max_error_number`
The maximum number of error lines allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means that no error lines are allowed.
The sampling window is max\_batch\_rows \* 10. That is, if the number of error lines is greater than max\_error\_number in the sampling window, the routine job will be suspended, and manual intervention is required to check the data quality problem.
Lines that are filtered by the where condition are not counted as error lines.
4. `strict_mode`
Whether to enable strict mode, the default is on. If turned on, the column type transformation of non-null raw data is filtered if the result is NULL. Specified as "strict\_mode" = "true"
5. timezone
Specifies the time zone in which the job will be loaded. The default by using session variable's timezone. This parameter affects all function results related to the time zone involved in the load.
data_source
The type of data source. Current support:
KAFKA
data_source_properties
Specify information about the data source.
syntax:
(
"key1" = "val1",
"key2" = "val2"
)
KAFKA data source
Kafka_broker_list
Kafka’s broker connection information. The format is ip:host. Multiple brokare separated by commas.
Example:
"kafka_broker_list" = "broker1:9092,broker2:9092"
kafka_topic
Specify the topic of Kafka to subscribe to.
Example:
"kafka_topic" = "my_topic"
kafka_partitions/kafka_offsets
Specify the kafka partition to be subscribed to, and the corresponding star offset for each partition.
Offset can specify a specific offset from 0 or greater, or:
OFFSET_BEGINNING: Subscribe from the location where the data is avaie.
OFFSET_END: Subscribe from the end.
If not specified, all partitions under topic are subscribed by default fromSET\_END.
Example:
```
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
```
4. property
Specify custom kafka parameters.
The function is equivalent to the "--property" parameter in the kafka shel
When the value of the parameter is a file, you need to add the keyword: "FILbefore the value.
For information on how to create a file, see "HELP CREATE FILE;"
For more supported custom parameters, see the configuration items on the nt side in the official CONFIGURATION documentation for librdkafka.
Example:
```
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
```
1. When connecting to Kafka using SSL, you need to specify the follg parameters:
```
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
```
among them:
"property.security.protocol" and "property.ssl.ca.location" are requ to indicate the connection method is SSL and the location of the CA certate.
If the client authentication is enabled on the Kafka server, you alsod to set:
```
"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"
```
Used to specify the public key of the client, the private key, and the word of the private key.
2. Specify the default starting offset for kafka partition
If kafka\_partitions/kafka\_offsets is not specified, all partitions are umed by default, and you can specify kafka\_default\_offsets to specify the star offset. The default is OFFSET\_END, which starts at the end of the substion.
Values:
1. OFFSET\_BEGINNING: Subscribe from the location where the data is avaie.
2. OFFSET\_END: Subscribe from the end.
Example:
`"property.kafka_default_offsets" = "OFFSET_BEGINNING"`
load data format sample
Integer class (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
Floating point class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356 Date class (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.
String class (CHAR/VARCHAR) (without quotes): I am a student, a
NULL value: \N
example
Create a Kafka routine load task named test1 for the example_tbl of example_db. Specify group.id and client.id, and automatically consume all partitions by default, with subscriptions starting at the end (OFFSET_END)
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx"
);
Create a Kafka routine load task named test1 for the example_tbl of example_db. The load task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
load data from Kafka clusters via SSL authentication. Also set the client.id parameter. The load task is in non-strict mode and the time zone is Africa/Abidjan
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
keyword
CREATE, ROUTINE, LOAD