ROUTINE LOAD
description
Routine Load function allows users to submit a resident load task, and continuously load data into Doris by continuously reading data from the specified data source. Currently, only text data format (CSV) data is loaded from Kafka by means of no authentication or SSL authentication.
Syntax:
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
[db.]job_name
The name of the load job, in the same database, only one job can run with the same name.
tbl_name
Specifies the name of the table that needs to be loaded.
merge_type
The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete on condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics
load_properties
Used to describe the load data. grammar:
[column_separator],
[columns_mapping],
[where_predicates],
[delete_on_predicates]
[partitions],
[preceding_predicates]
column_separator:
Specify column separators, such as:
COLUMNS TERMINATED BY ","
The default is:
\t
columns_mapping:
Specifies the mapping of columns in the source data and defines how the derived columns are generated.
Map column:
Specify in order, which columns in the source data correspond to which columns in the destination table. For columns that you want to skip, you can specify a column name that does not exist.
Suppose the destination table has three columns k1, k2, v1. The source data has 4 columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:
COLUMNS (k2, k1, xxx, v1)
Where xxx is a column that does not exist and is used to skip the third column in the source data.
Derived columns:
A column represented in the form of col_name = expr, which we call a derived column. That is, the value of the corresponding column in the destination table is calculated by expr.
Derived columns are usually arranged after the mapped column. Although this is not mandatory, Doris always parses the mapped columns first and then parses the derived columns.
Following an example, assume that the destination table also has column 4, v2, which is generated by the sum of k1 and k2. You can write as follows:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
where_predicates
Used to specify filter criteria to filter out unwanted columns. Filter columns can be either mapped columns or derived columns.
For example, if we only want to load a column with k1 greater than 100 and k2 equal to 1000, we would write as follows:
WHERE k1 > 100 and k2 = 1000
partitions
Specifies which partitions of the load destination table. If not specified, it will be automatically loaded into the corresponding partition.
Example:
PARTITION(p1, p2, p3)
delete_on_predicates:
Only used when merge type is MERGE
preceding_predicates
Used to filter original data. The original data is the data without column mapping and transformation. The user can filter the data before conversion, select the desired data, and then perform the conversion.
job_properties
A generic parameter that specifies a routine load job.
syntax:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
Currently we support the following parameters:
desired_concurrent_number
The degree of concurrency desired. A routine load job is split into multiple subtasks. This parameter specifies how many tasks can be executed simultaneously in a job. Must be greater than 0. The default is 3.
This concurrency is not the actual concurrency. The actual concurrency will be considered by the number of nodes in the cluster, the load, and the data source.
example:
"desired_concurrent_number" = "3"
max_batch_interval/max_batch_rows/max_batch_size
These three parameters represent:
The maximum execution time of each subtask, in seconds. The range is 5 to 60. The default is 10.
The maximum number of rows read per subtask. Must be greater than or equal to 200,000. The default is 200000.
The maximum number of bytes read per subtask. The unit is byte and the range is 100MB to 1GB. The default is 100MB.
These three parameters are used to control the execution time and throughput of a subtask. When either one reaches the threshold, the task ends.
example:
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
max_error_number
The maximum number of error lines allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means that no error lines are allowed.
The sampling window is max_batch_rows * 10. That is, if the number of error lines is greater than max_error_number in the sampling window, the routine job will be suspended, and manual intervention is required to check the data quality problem.
Lines that are filtered by the where condition are not counted as error lines.
strict_mode
Whether to enable strict mode, the default is disabled. If turned on, the column type transformation of non-null raw data is filtered if the result is NULL. Specified as “strict_mode” = “true”
timezone
Specifies the time zone in which the job will be loaded. The default by using session variable’s timezone. This parameter affects all function results related to the time zone involved in the load.
format
Specifies the format of the imported data. Support csv and json, the default is csv.
jsonpaths
There are two ways to import json: simple mode and matched mode. If jsonpath is set, it will be the matched mode import, otherwise it will be the simple mode import, please refer to the example for details.
strip_outer_array
Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false.json_root
json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is “”.send_batch_parallelism
Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceedmax_send_batch_parallelism_per_job
in BE config, then the coordinator BE will use the value ofmax_send_batch_parallelism_per_job
.
data_source
The type of data source. Current support:
KAFKA
data_source_properties
Specify information about the data source.
syntax:
(
"key1" = "val1",
"key2" = "val2"
)
KAFKA data source
Kafka_broker_list
Kafka’s broker connection information. The format is ip:host. Multiple brokers are separated by commas.
Example:
"kafka_broker_list" = "broker1:9092,broker2:9092"
kafka_topic
Specify the topic of Kafka to subscribe to.
Example:
"kafka_topic" = "my_topic"
kafka_partitions/kafka_offsets
Specify the kafka partition to be subscribed to, and the corresponding star offset for each partition.
Offset can specify a specific offset from 0 or greater, or:
OFFSET_BEGINNING: Subscribe from the location where the data is available.
OFFSET_END: Subscribe from the end.
Timestamp, the format must be like: “2021-05-11 10:00:00”, the system will automatically locate the offset of the first message greater than or equal to the timestamp. Note that the offset of the timestamp format cannot be mixed with the number type, only one of them can be selected.
If not specified, all partitions under topic are subscribed by default fromSET_END.
Example:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1",
"kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
property
Specify custom kafka parameters.
The function is equivalent to the “—property” parameter in the kafka shel
When the value of the parameter is a file, you need to add the keyword: “FILE” before the value.
For information on how to create a file, see “HELP CREATE FILE;”
For more supported custom parameters, see the configuration items on the nt side in the official CONFIGURATION documentation for librdkafka.
Example:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
When connecting to Kafka using SSL, you need to specify the following parameters:
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
among them:
“property.security.protocol” and “property.ssl.ca.location” are required to indicate the connection method is SSL and the location of the CA certificate.
If the client authentication is enabled on the Kafka server, you also need to set:
"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"
Used to specify the public key of the client, the private key, and the word of the private key.
Specify the default starting offset for kafka partition
If kafka_partitions/kafka_offsets is not specified, all partitions are unanmed by default, and you can specify kafka_default_offsets to specify the star offset. The default is OFFSET_END, which starts at the end of the subscription.
Values:
OFFSET_BEGINNING: Subscribe from the location where the data is available.
OFFSET_END: Subscribe from the end.
Timestamp, the format is the same as kafka_offsets
Example:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
"property.kafka_default_offsets" = "2021-05-11 10:00:00"
load data format sample
Integer class (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
Floating point class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
Date class (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.
String class (CHAR/VARCHAR) (without quotes): I am a student, a
NULL value: \N
example
Create a Kafka routine load task named test1 for the example_tbl of example_db. Specify group.id and client.id, and automatically consume all partitions by default, with subscriptions starting at the end (OFFSET_END)
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx"
);
Create a Kafka routine load task named test1 for the example_tbl of example_db. The load task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
load data from Kafka clusters via SSL authentication. Also set the client.id parameter. The load task is in non-strict mode and the time zone is Africa/Abidjan
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
Create a Kafka routine load task named test1 for the example_tbl of example_db. The load data is a simple json.
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
It support two kinds data style: 1){“category”:”a9jadhx”,”author”:”test”,”price”:895} 2)[ {“category”:”a9jadhx”,”author”:”test”,”price”:895}, {“category”:”axdfa1”,”author”:”EvelynWaugh”,”price”:1299} ]
Matched load json by jsonpaths.
CREATE TABLE `example_tbl` (
`category` varchar(24) NULL COMMENT "",
`author` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`price` double REPLACE
) ENGINE=OLAP
AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p0 VALUES [("-2147483648"), ("20200509")),
PARTITION p20200509 VALUES [("20200509"), ("20200510")),
PARTITION p20200510 VALUES [("20200510"), ("20200511")),
PARTITION p20200511 VALUES [("20200511"), ("20200512")))
DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
PROPERTIES (
"replication_num" = "1"
);
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
For example json data: [ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ]
Tips: 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath.
User specifies the json_root node CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, ‘%Y%m%d’)) PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false”, “format” = “json”, “jsonpaths” = “[“$.category”,”$.author”,”$.price”,”$.timestamp”]“, “strip_outer_array” = “true”, “json_root” = “$.RECORDS” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2”, “kafka_offsets” = “0,0,0” ); For example json data: { “RECORDS”:[ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ] }
Create a Kafka routine load task named test1 for the example_tbl of example_db. delete all data key columns match v3 >100 key columns.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl WITH MERGE COLUMNS(k1, k2, k3, v1, v2, v3), WHERE k1 > 100 and k2 like “%doris%”, DELETE ON v3 >100 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );
Filter original data
CREATE ROUTINE LOAD example_db.test_job ON example_tbl COLUMNS TERMINATED BY “,”, COLUMNS(k1,k2,source_sequence,v1,v2), PRECEDING FILTER k1 > 2 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “30”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );
Start consumption from the specified point in time
CREATE ROUTINE LOAD example_db.test_job ON example_tbl PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “30”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “property.kafka_default_offsets” = “2021-10-10 11:00:00” );
keyword
CREATE, ROUTINE, LOAD