ROUTINE LOAD

description

Routine Load function allows users to submit a resident load task, and continuously load data into Doris by continuously reading data from the specified data source. Currently, only text data format (CSV) data is loaded from Kafka by means of no authentication or SSL authentication.

Syntax:

  1. CREATE ROUTINE LOAD [db.]job_name ON tbl_name
  2. [merge_type]
  3. [load_properties]
  4. [job_properties]
  5. FROM data_source
  6. [data_source_properties]
  1. [db.]job_name

    The name of the load job, in the same database, only one job can run with the same name.

  2. tbl_name

    Specifies the name of the table that needs to be loaded.

  3. merge_type

    The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete on condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics

  4. load_properties

    Used to describe the load data. grammar:

    1. [column_separator],
    2. [columns_mapping],
    3. [where_predicates],
    4. [delete_on_predicates]
    5. [partitions],
    6. [preceding_predicates]
    1. column_separator:

      Specify column separators, such as:

      COLUMNS TERMINATED BY ","

      The default is: \t

    2. columns_mapping:

      Specifies the mapping of columns in the source data and defines how the derived columns are generated.

      1. Map column:

        Specify in order, which columns in the source data correspond to which columns in the destination table. For columns that you want to skip, you can specify a column name that does not exist.

        Suppose the destination table has three columns k1, k2, v1. The source data has 4 columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:

        COLUMNS (k2, k1, xxx, v1)

        Where xxx is a column that does not exist and is used to skip the third column in the source data.

      2. Derived columns:

        A column represented in the form of col_name = expr, which we call a derived column. That is, the value of the corresponding column in the destination table is calculated by expr.

        Derived columns are usually arranged after the mapped column. Although this is not mandatory, Doris always parses the mapped columns first and then parses the derived columns.

        Following an example, assume that the destination table also has column 4, v2, which is generated by the sum of k1 and k2. You can write as follows:

        COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

    3. where_predicates

      Used to specify filter criteria to filter out unwanted columns. Filter columns can be either mapped columns or derived columns.

      For example, if we only want to load a column with k1 greater than 100 and k2 equal to 1000, we would write as follows:

      WHERE k1 > 100 and k2 = 1000

    4. partitions

      Specifies which partitions of the load destination table. If not specified, it will be automatically loaded into the corresponding partition.

      Example:

      PARTITION(p1, p2, p3)

    5. delete_on_predicates:

      Only used when merge type is MERGE

    6. preceding_predicates

      Used to filter original data. The original data is the data without column mapping and transformation. The user can filter the data before conversion, select the desired data, and then perform the conversion.

  5. job_properties

    A generic parameter that specifies a routine load job.

    syntax:

    1. PROPERTIES (
    2. "key1" = "val1",
    3. "key2" = "val2"
    4. )

    Currently we support the following parameters:

    1. desired_concurrent_number

      The degree of concurrency desired. A routine load job is split into multiple subtasks. This parameter specifies how many tasks can be executed simultaneously in a job. Must be greater than 0. The default is 3.

      This concurrency is not the actual concurrency. The actual concurrency will be considered by the number of nodes in the cluster, the load, and the data source.

      example:

      "desired_concurrent_number" = "3"

    2. max_batch_interval/max_batch_rows/max_batch_size

      These three parameters represent:

      1. The maximum execution time of each subtask, in seconds. The range is 5 to 60. The default is 10.

      2. The maximum number of rows read per subtask. Must be greater than or equal to 200,000. The default is 200000.

      3. The maximum number of bytes read per subtask. The unit is byte and the range is 100MB to 1GB. The default is 100MB.

      These three parameters are used to control the execution time and throughput of a subtask. When either one reaches the threshold, the task ends.

      example:

      1. "max_batch_interval" = "20",
      2. "max_batch_rows" = "300000",
      3. "max_batch_size" = "209715200"
    3. max_error_number

      The maximum number of error lines allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means that no error lines are allowed.

      The sampling window is max_batch_rows * 10. That is, if the number of error lines is greater than max_error_number in the sampling window, the routine job will be suspended, and manual intervention is required to check the data quality problem.

      Lines that are filtered by the where condition are not counted as error lines.

    4. strict_mode

      Whether to enable strict mode, the default is disabled. If turned on, the column type transformation of non-null raw data is filtered if the result is NULL. Specified as “strict_mode” = “true”

    5. timezone

      Specifies the time zone in which the job will be loaded. The default by using session variable’s timezone. This parameter affects all function results related to the time zone involved in the load.

    6. format

      Specifies the format of the imported data. Support csv and json, the default is csv.

    7. jsonpaths

      There are two ways to import json: simple mode and matched mode. If jsonpath is set, it will be the matched mode import, otherwise it will be the simple mode import, please refer to the example for details.

    8. strip_outer_array Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false.

    9. json_root json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is “”.

    10. send_batch_parallelism Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed max_send_batch_parallelism_per_job in BE config, then the coordinator BE will use the value of max_send_batch_parallelism_per_job.

  6. data_source

    The type of data source. Current support:

    KAFKA

  7. data_source_properties

    Specify information about the data source.

    syntax:

    1. (
    2. "key1" = "val1",
    3. "key2" = "val2"
    4. )
    1. KAFKA data source

      Kafka_broker_list

      Kafka’s broker connection information. The format is ip:host. Multiple brokers are separated by commas.

      Example:

      "kafka_broker_list" = "broker1:9092,broker2:9092"

    2. kafka_topic

      Specify the topic of Kafka to subscribe to.

      Example:

      "kafka_topic" = "my_topic"

    3. kafka_partitions/kafka_offsets

      Specify the kafka partition to be subscribed to, and the corresponding star offset for each partition.

      Offset can specify a specific offset from 0 or greater, or:

      1. OFFSET_BEGINNING: Subscribe from the location where the data is available.

      2. OFFSET_END: Subscribe from the end.

      3. Timestamp, the format must be like: “2021-05-11 10:00:00”, the system will automatically locate the offset of the first message greater than or equal to the timestamp. Note that the offset of the timestamp format cannot be mixed with the number type, only one of them can be selected.

      If not specified, all partitions under topic are subscribed by default fromSET_END.

      Example:

      1. "kafka_partitions" = "0,1,2,3",
      2. "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
      3. "kafka_partitions" = "0,1",
      4. "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
    4. property

      Specify custom kafka parameters.

      The function is equivalent to the “—property” parameter in the kafka shel

      When the value of the parameter is a file, you need to add the keyword: “FILE” before the value.

      For information on how to create a file, see “HELP CREATE FILE;”

      For more supported custom parameters, see the configuration items on the nt side in the official CONFIGURATION documentation for librdkafka.

      Example:

      1. "property.client.id" = "12345",
      2. "property.ssl.ca.location" = "FILE:ca.pem"
      1. When connecting to Kafka using SSL, you need to specify the following parameters:

        1. "property.security.protocol" = "ssl",
        2. "property.ssl.ca.location" = "FILE:ca.pem",
        3. "property.ssl.certificate.location" = "FILE:client.pem",
        4. "property.ssl.key.location" = "FILE:client.key",
        5. "property.ssl.key.password" = "abcdefg"

        among them:

        “property.security.protocol” and “property.ssl.ca.location” are required to indicate the connection method is SSL and the location of the CA certificate.

        If the client authentication is enabled on the Kafka server, you also need to set:

        1. "property.ssl.certificate.location"
        2. "property.ssl.key.location"
        3. "property.ssl.key.password"

        Used to specify the public key of the client, the private key, and the word of the private key.

      2. Specify the default starting offset for kafka partition

        If kafka_partitions/kafka_offsets is not specified, all partitions are unanmed by default, and you can specify kafka_default_offsets to specify the star offset. The default is OFFSET_END, which starts at the end of the subscription.

        Values:

        1. OFFSET_BEGINNING: Subscribe from the location where the data is available.

        2. OFFSET_END: Subscribe from the end.

        3. Timestamp, the format is the same as kafka_offsets

        Example:

        "property.kafka_default_offsets" = "OFFSET_BEGINNING" "property.kafka_default_offsets" = "2021-05-11 10:00:00"

  8. load data format sample

    Integer class (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234

    Floating point class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356

    Date class (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03.

    String class (CHAR/VARCHAR) (without quotes): I am a student, a

    NULL value: \N

example

  1. Create a Kafka routine load task named test1 for the example_tbl of example_db. Specify group.id and client.id, and automatically consume all partitions by default, with subscriptions starting at the end (OFFSET_END)

    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    3. PROPERTIES
    4. (
    5. "desired_concurrent_number"="3",
    6. "max_batch_interval" = "20",
    7. "max_batch_rows" = "300000",
    8. "max_batch_size" = "209715200",
    9. "strict_mode" = "false"
    10. )
    11. FROM KAFKA
    12. (
    13. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    14. "kafka_topic" = "my_topic",
    15. "property.group.id" = "xxx",
    16. "property.client.id" = "xxx"
    17. );
  2. Create a Kafka routine load task named test1 for the example_tbl of example_db. The load task is in strict mode.

    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    3. WHERE k1 > 100 and k2 like "%doris%"
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="3",
    7. "max_batch_interval" = "20",
    8. "max_batch_rows" = "300000",
    9. "max_batch_size" = "209715200",
    10. "strict_mode" = "false"
    11. )
    12. FROM KAFKA
    13. (
    14. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15. "kafka_topic" = "my_topic",
    16. "kafka_partitions" = "0,1,2,3",
    17. "kafka_offsets" = "101,0,0,200"
    18. );
  3. load data from Kafka clusters via SSL authentication. Also set the client.id parameter. The load task is in non-strict mode and the time zone is Africa/Abidjan

    1. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    2. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    3. WHERE k1 > 100 and k2 like "%doris%"
    4. PROPERTIES
    5. (
    6. "desired_concurrent_number"="3",
    7. "max_batch_interval" = "20",
    8. "max_batch_rows" = "300000",
    9. "max_batch_size" = "209715200",
    10. "strict_mode" = "false",
    11. "timezone" = "Africa/Abidjan"
    12. )
    13. FROM KAFKA
    14. (
    15. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    16. "kafka_topic" = "my_topic",
    17. "property.security.protocol" = "ssl",
    18. "property.ssl.ca.location" = "FILE:ca.pem",
    19. "property.ssl.certificate.location" = "FILE:client.pem",
    20. "property.ssl.key.location" = "FILE:client.key",
    21. "property.ssl.key.password" = "abcdefg",
    22. "property.client.id" = "my_client_id"
    23. );
  4. Create a Kafka routine load task named test1 for the example_tbl of example_db. The load data is a simple json.

    1. CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    2. COLUMNS(category,price,author)
    3. PROPERTIES
    4. (
    5. "desired_concurrent_number"="3",
    6. "max_batch_interval" = "20",
    7. "max_batch_rows" = "300000",
    8. "max_batch_size" = "209715200",
    9. "strict_mode" = "false",
    10. "format" = "json"
    11. )
    12. FROM KAFKA
    13. (
    14. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    15. "kafka_topic" = "my_topic",
    16. "kafka_partitions" = "0,1,2",
    17. "kafka_offsets" = "0,0,0"
    18. );

    It support two kinds data style: 1){“category”:”a9jadhx”,”author”:”test”,”price”:895} 2)[ {“category”:”a9jadhx”,”author”:”test”,”price”:895}, {“category”:”axdfa1”,”author”:”EvelynWaugh”,”price”:1299} ]

  5. Matched load json by jsonpaths.

    1. CREATE TABLE `example_tbl` (
    2. `category` varchar(24) NULL COMMENT "",
    3. `author` varchar(24) NULL COMMENT "",
    4. `timestamp` bigint(20) NULL COMMENT "",
    5. `dt` int(11) NULL COMMENT "",
    6. `price` double REPLACE
    7. ) ENGINE=OLAP
    8. AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    9. COMMENT "OLAP"
    10. PARTITION BY RANGE(`dt`)
    11. (PARTITION p0 VALUES [("-2147483648"), ("20200509")),
    12. PARTITION p20200509 VALUES [("20200509"), ("20200510")),
    13. PARTITION p20200510 VALUES [("20200510"), ("20200511")),
    14. PARTITION p20200511 VALUES [("20200511"), ("20200512")))
    15. DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    16. PROPERTIES (
    17. "replication_num" = "1"
    18. );
    19. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    20. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    21. PROPERTIES
    22. (
    23. "desired_concurrent_number"="3",
    24. "max_batch_interval" = "20",
    25. "max_batch_rows" = "300000",
    26. "max_batch_size" = "209715200",
    27. "strict_mode" = "false",
    28. "format" = "json",
    29. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
    30. "strip_outer_array" = "true"
    31. )
    32. FROM KAFKA
    33. (
    34. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    35. "kafka_topic" = "my_topic",
    36. "kafka_partitions" = "0,1,2",
    37. "kafka_offsets" = "0,0,0"
    38. );

    For example json data: [ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ]

Tips: 1)If the json data starts as an array and each object in the array is a record, you need to set the strip_outer_array to true to represent the flat array. 2)If the json data starts with an array, and each object in the array is a record, our ROOT node is actually an object in the array when we set jsonpath.

  1. User specifies the json_root node CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, ‘%Y%m%d’)) PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false”, “format” = “json”, “jsonpaths” = “[“$.category”,”$.author”,”$.price”,”$.timestamp”]“, “strip_outer_array” = “true”, “json_root” = “$.RECORDS” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2”, “kafka_offsets” = “0,0,0” ); For example json data: { “RECORDS”:[ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ] }

    1. Create a Kafka routine load task named test1 for the example_tbl of example_db. delete all data key columns match v3 >100 key columns.

      CREATE ROUTINE LOAD example_db.test1 ON example_tbl WITH MERGE COLUMNS(k1, k2, k3, v1, v2, v3), WHERE k1 > 100 and k2 like “%doris%”, DELETE ON v3 >100 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “20”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200”, “strict_mode” = “false” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );

    2. Filter original data

      CREATE ROUTINE LOAD example_db.test_job ON example_tbl COLUMNS TERMINATED BY “,”, COLUMNS(k1,k2,source_sequence,v1,v2), PRECEDING FILTER k1 > 2 PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “30”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “kafka_partitions” = “0,1,2,3”, “kafka_offsets” = “101,0,0,200” );

    3. Start consumption from the specified point in time

      CREATE ROUTINE LOAD example_db.test_job ON example_tbl PROPERTIES ( “desired_concurrent_number”=”3”, “max_batch_interval” = “30”, “max_batch_rows” = “300000”, “max_batch_size” = “209715200” ) FROM KAFKA ( “kafka_broker_list” = “broker1:9092,broker2:9092,broker3:9092”, “kafka_topic” = “my_topic”, “property.kafka_default_offsets” = “2021-10-10 11:00:00” );

keyword

  1. CREATE, ROUTINE, LOAD