ROUTINE LOAD

description

  1. 例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
  2. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入文本格式(CSV)的数据。

语法:

  1. CREATE ROUTINE LOAD [db.]job_name ON tbl_name
  2. [merge_type]
  3. [load_properties]
  4. [job_properties]
  5. FROM data_source
  6. [data_source_properties]
  7. 1. [db.]job_name
  8. 导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
  9. 2. tbl_name
  10. 指定需要导入的表的名称。
  11. 3. merge_type
  12. 数据的合并类型,一共支持三种类型APPENDDELETEMERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 语法为[WITH MERGE|APPEND|DELETE]
  13. 4. load_properties
  14. 用于描述导入数据。语法:
  15. [column_separator],
  16. [columns_mapping],
  17. [where_predicates],
  18. [delete_on_predicates],
  19. [source_sequence],
  20. [partitions],
  21. [preceding_predicates]
  22. 1. column_separator:
  23. 指定列分隔符,如:
  24. COLUMNS TERMINATED BY ","
  25. 默认为:\t
  26. 2. columns_mapping:
  27. 指定源数据中列的映射关系,以及定义衍生列的生成方式。
  28. 1. 映射列:
  29. 按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。
  30. 假设目的表有三列 k1, k2, v1。源数据有4列,其中第124列分别对应 k2, k1, v1。则书写如下:
  31. COLUMNS (k2, k1, xxx, v1)
  32. 其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
  33. 2. 衍生列:
  34. col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。
  35. 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。
  36. 接上一个示例,假设目的表还有第4 v2v2 k1 k2 的和产生。则可以书写如下:
  37. COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
  38. 3. where_predicates
  39. 用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。
  40. 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:
  41. WHERE k1 > 100 and k2 = 1000
  42. 4. partitions
  43. 指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
  44. 示例:
  45. PARTITION(p1, p2, p3)
  46. 5. delete_on_predicates
  47. 表示删除条件,仅在 merge type MERGE 时有意义,语法与where 相同
  48. 6. source_sequence:
  49. 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
  50. 7. preceding_predicates
  51. PRECEDING FILTER predicate
  52. 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
  53. 5. job_properties
  54. 用于指定例行导入作业的通用参数。
  55. 语法:
  56. PROPERTIES (
  57. "key1" = "val1",
  58. "key2" = "val2"
  59. )
  60. 目前我们支持以下参数:
  61. 1. desired_concurrent_number
  62. 期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3
  63. 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。
  64. 例:
  65. "desired_concurrent_number" = "3"
  66. 2. max_batch_interval/max_batch_rows/max_batch_size
  67. 这三个参数分别表示:
  68. 1)每个子任务最大执行时间,单位是秒。范围为 5 60。默认为10
  69. 2)每个子任务最多读取的行数。必须大于等于200000。默认是200000
  70. 3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 1GB。默认是 100MB
  71. 这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
  72. 例:
  73. "max_batch_interval" = "20",
  74. "max_batch_rows" = "300000",
  75. "max_batch_size" = "209715200"
  76. 3. max_error_number
  77. 采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
  78. 采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
  79. where 条件过滤掉的行不算错误行。
  80. 4. strict_mode
  81. 是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"
  82. 5. timezone
  83. 指定导入作业所使用的时区。默认为使用 Session timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。
  84. 6. format
  85. 指定导入数据格式,默认是csv,支持json格式。
  86. 7. jsonpaths
  87. jsonpaths: 导入json方式分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入,具体可参考示例。
  88. 8. strip_outer_array
  89. 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
  90. 9. json_root
  91. json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""
  92. 6. data_source
  93. 数据源的类型。当前支持:
  94. KAFKA
  95. 7. data_source_properties
  96. 指定数据源相关的信息。
  97. 语法:
  98. (
  99. "key1" = "val1",
  100. "key2" = "val2"
  101. )
  102. 1. KAFKA 数据源
  103. 1. kafka_broker_list
  104. Kafka broker 连接信息。格式为 ip:host。多个broker之间以逗号分隔。
  105. 示例:
  106. "kafka_broker_list" = "broker1:9092,broker2:9092"
  107. 2. kafka_topic
  108. 指定要订阅的 Kafka topic
  109. 示例:
  110. "kafka_topic" = "my_topic"
  111. 3. kafka_partitions/kafka_offsets
  112. 指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset
  113. offset 可以指定从大于等于 0 的具体 offset,或者:
  114. 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
  115. 2) OFFSET_END: 从末尾开始订阅。
  116. 如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition
  117. 示例:
  118. "kafka_partitions" = "0,1,2,3",
  119. "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
  120. 4. property
  121. 指定自定义kafka参数。
  122. 功能等同于kafka shell "--property" 参数。
  123. 当参数的 value 为一个文件时,需要在 value 前加上关键词:"FILE:"
  124. 关于如何创建文件,请参阅 "HELP CREATE FILE;"
  125. 更多支持的自定义参数,请参阅 librdkafka 的官方 CONFIGURATION 文档中,client 端的配置项。
  126. 示例:
  127. "property.client.id" = "12345",
  128. "property.ssl.ca.location" = "FILE:ca.pem"
  129. 1.使用 SSL 连接 Kafka 时,需要指定以下参数:
  130. "property.security.protocol" = "ssl",
  131. "property.ssl.ca.location" = "FILE:ca.pem",
  132. "property.ssl.certificate.location" = "FILE:client.pem",
  133. "property.ssl.key.location" = "FILE:client.key",
  134. "property.ssl.key.password" = "abcdefg"
  135. 其中:
  136. "property.security.protocol" "property.ssl.ca.location" 为必须,用于指明连接方式为 SSL,以及 CA 证书的位置。
  137. 如果 Kafka server 端开启了 client 认证,则还需设置:
  138. "property.ssl.certificate.location"
  139. "property.ssl.key.location"
  140. "property.ssl.key.password"
  141. 分别用于指定 client public keyprivate key 以及 private key 的密码。
  142. 2.指定kafka partition的默认起始offset
  143. 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。
  144. 值为
  145. 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
  146. 2) OFFSET_END: 从末尾开始订阅。
  147. 示例:
  148. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
  149. 8. 导入数据格式样例
  150. 整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
  151. 浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
  152. 日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03
  153. 字符串类(CHAR/VARCHAR)(无引号):I am a student, a
  154. NULL值:\N

example

  1. 1. example_db example_tbl 创建一个名为 test1 Kafka 例行导入任务。指定列分隔符和 group.id client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅
  2. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  3. COLUMNS TERMINATED BY ",",
  4. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
  5. PROPERTIES
  6. (
  7. "desired_concurrent_number"="3",
  8. "max_batch_interval" = "20",
  9. "max_batch_rows" = "300000",
  10. "max_batch_size" = "209715200",
  11. "strict_mode" = "false"
  12. )
  13. FROM KAFKA
  14. (
  15. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  16. "kafka_topic" = "my_topic",
  17. "property.group.id" = "xxx",
  18. "property.client.id" = "xxx",
  19. "property.kafka_default_offsets" = "OFFSET_BEGINNING"
  20. );
  21. 2. example_db example_tbl 创建一个名为 test1 Kafka 例行导入任务。导入任务为严格模式。
  22. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  23. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
  24. WHERE k1 > 100 and k2 like "%doris%"
  25. PROPERTIES
  26. (
  27. "desired_concurrent_number"="3",
  28. "max_batch_interval" = "20",
  29. "max_batch_rows" = "300000",
  30. "max_batch_size" = "209715200",
  31. "strict_mode" = "false"
  32. )
  33. FROM KAFKA
  34. (
  35. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  36. "kafka_topic" = "my_topic",
  37. "kafka_partitions" = "0,1,2,3",
  38. "kafka_offsets" = "101,0,0,200"
  39. );
  40. 3. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式,时区为 Africa/Abidjan
  41. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  42. COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
  43. WHERE k1 > 100 and k2 like "%doris%"
  44. PROPERTIES
  45. (
  46. "desired_concurrent_number"="3",
  47. "max_batch_interval" = "20",
  48. "max_batch_rows" = "300000",
  49. "max_batch_size" = "209715200",
  50. "strict_mode" = "false",
  51. "timezone" = "Africa/Abidjan"
  52. )
  53. FROM KAFKA
  54. (
  55. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  56. "kafka_topic" = "my_topic",
  57. "property.security.protocol" = "ssl",
  58. "property.ssl.ca.location" = "FILE:ca.pem",
  59. "property.ssl.certificate.location" = "FILE:client.pem",
  60. "property.ssl.key.location" = "FILE:client.key",
  61. "property.ssl.key.password" = "abcdefg",
  62. "property.client.id" = "my_client_id"
  63. );
  64. 4. 简单模式导入json
  65. CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
  66. COLUMNS(category,price,author)
  67. PROPERTIES
  68. (
  69. "desired_concurrent_number"="3",
  70. "max_batch_interval" = "20",
  71. "max_batch_rows" = "300000",
  72. "max_batch_size" = "209715200",
  73. "strict_mode" = "false",
  74. "format" = "json"
  75. )
  76. FROM KAFKA
  77. (
  78. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  79. "kafka_topic" = "my_topic",
  80. "kafka_partitions" = "0,1,2",
  81. "kafka_offsets" = "0,0,0"
  82. );
  83. 支持两种json数据格式:
  84. 1){"category":"a9jadhx","author":"test","price":895}
  85. 2)[
  86. {"category":"a9jadhx","author":"test","price":895},
  87. {"category":"axdfa1","author":"EvelynWaugh","price":1299}
  88. ]
  89. 5. 精准导入json数据格式
  90. CREATE TABLE `example_tbl` (
  91. `category` varchar(24) NULL COMMENT "",
  92. `author` varchar(24) NULL COMMENT "",
  93. `timestamp` bigint(20) NULL COMMENT "",
  94. `dt` int(11) NULL COMMENT "",
  95. `price` double REPLACE
  96. ) ENGINE=OLAP
  97. AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
  98. COMMENT "OLAP"
  99. PARTITION BY RANGE(`dt`)
  100. (PARTITION p0 VALUES [("-2147483648"), ("20200509")),
  101. PARTITION p20200509 VALUES [("20200509"), ("20200510")),
  102. PARTITION p20200510 VALUES [("20200510"), ("20200511")),
  103. PARTITION p20200511 VALUES [("20200511"), ("20200512")))
  104. DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
  105. PROPERTIES (
  106. "storage_type" = "COLUMN",
  107. "replication_num" = "1"
  108. );
  109. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  110. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
  111. PROPERTIES
  112. (
  113. "desired_concurrent_number"="3",
  114. "max_batch_interval" = "20",
  115. "max_batch_rows" = "300000",
  116. "max_batch_size" = "209715200",
  117. "strict_mode" = "false",
  118. "format" = "json",
  119. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
  120. "strip_outer_array" = "true"
  121. )
  122. FROM KAFKA
  123. (
  124. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  125. "kafka_topic" = "my_topic",
  126. "kafka_partitions" = "0,1,2",
  127. "kafka_offsets" = "0,0,0"
  128. );

json数据格式: [ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ] 说明: 1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。 2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

  1. 6. 用户指定根节点json_root
  2. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  3. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
  4. PROPERTIES
  5. (
  6. "desired_concurrent_number"="3",
  7. "max_batch_interval" = "20",
  8. "max_batch_rows" = "300000",
  9. "max_batch_size" = "209715200",
  10. "strict_mode" = "false",
  11. "format" = "json",
  12. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
  13. "strip_outer_array" = "true",
  14. "json_root" = "$.RECORDS"
  15. )
  16. FROM KAFKA
  17. (
  18. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  19. "kafka_topic" = "my_topic",
  20. "kafka_partitions" = "0,1,2",
  21. "kafka_offsets" = "0,0,0"
  22. );

json数据格式: { “RECORDS”:[ {“category”:”11”,”title”:”SayingsoftheCentury”,”price”:895,”timestamp”:1589191587}, {“category”:”22”,”author”:”2avc”,”price”:895,”timestamp”:1589191487}, {“category”:”33”,”author”:”3avc”,”title”:”SayingsoftheCentury”,”timestamp”:1589191387} ] }

  1. 7. example_db example_tbl 创建一个名为 test1 Kafka 例行导入任务。并且删除与v3 >100 行相匹配的key列的行
  2. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  3. WITH MERGE
  4. COLUMNS(k1, k2, k3, v1, v2, v3),
  5. WHERE k1 > 100 and k2 like "%doris%",
  6. DELETE ON v3 >100
  7. PROPERTIES
  8. (
  9. "desired_concurrent_number"="3",
  10. "max_batch_interval" = "20",
  11. "max_batch_rows" = "300000",
  12. "max_batch_size" = "209715200",
  13. "strict_mode" = "false"
  14. )
  15. FROM KAFKA
  16. 8. 导入数据到含有sequence列的UNIQUE_KEYS表中
  17. CREATE ROUTINE LOAD example_db.test_job ON example_tbl
  18. COLUMNS TERMINATED BY ",",
  19. COLUMNS(k1,k2,source_sequence,v1,v2),
  20. ORDER BY source_sequence
  21. PROPERTIES
  22. (
  23. "desired_concurrent_number"="3",
  24. "max_batch_interval" = "30",
  25. "max_batch_rows" = "300000",
  26. "max_batch_size" = "209715200"
  27. ) FROM KAFKA
  28. (
  29. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  30. "kafka_topic" = "my_topic",
  31. "kafka_partitions" = "0,1,2,3",
  32. "kafka_offsets" = "101,0,0,200"
  33. );
  34. 8. 过滤原始数据
  35. CREATE ROUTINE LOAD example_db.test_job ON example_tbl
  36. COLUMNS TERMINATED BY ",",
  37. COLUMNS(k1,k2,source_sequence,v1,v2),
  38. PRECEDING FILTER k1 > 2
  39. PROPERTIES
  40. (
  41. "desired_concurrent_number"="3",
  42. "max_batch_interval" = "30",
  43. "max_batch_rows" = "300000",
  44. "max_batch_size" = "209715200"
  45. ) FROM KAFKA
  46. (
  47. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  48. "kafka_topic" = "my_topic",
  49. "kafka_partitions" = "0,1,2,3",
  50. "kafka_offsets" = "101,0,0,200"
  51. );

keyword

  1. CREATE,ROUTINE,LOAD