BROKER LOAD

description

  1. Broker load 通过随 Doris 集群一同部署的 broker 进行,访问对应数据源的数据,进行数据导入。
  2. 可以通过 show broker 命令查看已经部署的 broker
  3. 目前支持以下4种数据源:
  4. 1. Baidu HDFS:百度内部的 hdfs,仅限于百度内部使用。
  5. 2. Baidu AFS:百度内部的 afs,仅限于百度内部使用。
  6. 3. Baidu Object Storage(BOS):百度对象存储。仅限百度内部用户、公有云用户或其他可以访问 BOS 的用户使用。
  7. 4. Apache HDFS:社区版本 hdfs
  8. 5. Amazon S3Amazon对象存储。

语法:

  1. LOAD LABEL load_label
  2. (
  3. data_desc1[, data_desc2, ...]
  4. )
  5. WITH BROKER broker_name
  6. [broker_properties]
  7. [opt_properties];
  8. 1. load_label
  9. 当前导入批次的标签。在一个 database 内唯一。
  10. 语法:
  11. [database_name.]your_label
  12. 2. data_desc
  13. 用于描述一批导入数据。
  14. 语法:
  15. [MERGE|APPEND|DELETE]
  16. DATA INFILE
  17. (
  18. "file_path1"[, file_path2, ...]
  19. )
  20. [NEGATIVE]
  21. INTO TABLE `table_name`
  22. [PARTITION (p1, p2)]
  23. [COLUMNS TERMINATED BY "column_separator"]
  24. [FORMAT AS "file_type"]
  25. [(column_list)]
  26. [PRECEDING FILTER predicate]
  27. [SET (k1 = func(k2))]
  28. [WHERE predicate]
  29. [DELETE ON label=true]
  30. [ORDER BY source_sequence]
  31. 说明:
  32. file_path:
  33. 文件路径,可以指定到一个文件,也可以用 * 通配符指定某个目录下的所有文件。通配符必须匹配到文件,而不能是目录。
  34. PARTITION:
  35. 如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。
  36. 如果不指定,默认导入table的所有分区。
  37. NEGATIVE
  38. 如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。
  39. 该参数仅适用于存在 value 列,并且 value 列的聚合类型仅为 SUM 的情况。
  40. column_separator
  41. 用于指定导入文件中的列分隔符。默认为 \t
  42. 如果是不可见字符,则需要加\\x作为前缀,使用十六进制来表示分隔符。
  43. hive文件的分隔符\x01,指定为"\\x01"
  44. file_type
  45. 用于指定导入文件的类型,例如:parquetorccsv。默认值通过文件后缀名判断。
  46. column_list
  47. 用于指定导入文件中的列和 table 中的列的对应关系。
  48. 当需要跳过导入文件中的某一列时,将该列指定为 table 中不存在的列名即可。
  49. 语法:
  50. (col_name1, col_name2, ...)
  51. PRECEDING FILTER predicate:
  52. 用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。
  53. SET:
  54. 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。语法为 `column_name` = expression。举几个例子帮助理解。
  55. 1: 表中有3个列“c1, c2, c3", 源文件中前两列依次对应(c1,c2),后两列之和对应c3;那么需要指定 columns (c1,c2,tmp_c3,tmp_c4) SET (c3=tmp_c3+tmp_c4);
  56. 例2: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式。
  57. 那么可以指定 columns(tmp_time) set (year = year(tmp_time), month=month(tmp_time), day=day(tmp_time)) 完成导入。
  58. WHERE:
  59. 对做完 transform 的数据进行过滤,符合 where 条件的数据才能被导入。WHERE 语句中只可引用表中列名。
  60. merge_type:
  61. 数据的合并类型,一共支持三种类型APPENDDELETEMERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理,
  62. delete_on_predicates:
  63. 表示删除条件,仅在 merge type MERGE 时有意义,语法与where 相同
  64. ORDER BY:
  65. 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
  66. 3. broker_name
  67. 所使用的 broker 名称,可以通过 show broker 命令查看。
  68. 4. broker_properties
  69. 用于提供通过 broker 访问数据源的信息。不同的 broker,以及不同的访问方式,需要提供的信息不同。
  70. 1. Baidu HDFS/AFS
  71. 访问百度内部的 hdfs/afs 目前仅支持简单认证,需提供:
  72. usernamehdfs 用户名
  73. passwordhdfs 密码
  74. 2. BOS
  75. 需提供:
  76. bos_endpointBOS endpoint
  77. bos_accesskey:公有云用户的 accesskey
  78. bos_secret_accesskey:公有云用户的 secret_accesskey
  79. 3. Apache HDFS
  80. 社区版本的 hdfs,支持简单认证、kerberos 认证。以及支持 HA 配置。
  81. 简单认证:
  82. hadoop.security.authentication = simple (默认)
  83. usernamehdfs 用户名
  84. passwordhdfs 密码
  85. kerberos 认证:
  86. hadoop.security.authentication = kerberos
  87. kerberos_principal:指定 kerberos principal
  88. kerberos_keytab:指定 kerberos keytab 文件路径。该文件必须为 broker 进程所在服务器上的文件。
  89. kerberos_keytab_content:指定 kerberos keytab 文件内容经过 base64 编码之后的内容。这个跟 kerberos_keytab 配置二选一就可以。
  90. namenode HA
  91. 通过配置 namenode HA,可以在 namenode 切换时,自动识别到新的 namenode
  92. dfs.nameservices: 指定 hdfs 服务的名字,自定义,如:"dfs.nameservices" = "my_ha"
  93. dfs.ha.namenodes.xxx:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx dfs.nameservices 中自定义的名字,如 "dfs.ha.namenodes.my_ha" = "my_nn"
  94. dfs.namenode.rpc-address.xxx.nn:指定 namenode rpc地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"
  95. dfs.client.failover.proxy.provider:指定 client 连接 namenode provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
  96. 4. Amazon S3
  97. 需提供:
  98. fs.s3a.access.keyAmazonS3access key
  99. fs.s3a.secret.keyAmazonS3secret key
  100. fs.s3a.endpointAmazonS3endpoint
  101. 4. opt_properties
  102. 用于指定一些特殊参数。
  103. 语法:
  104. [PROPERTIES ("key"="value", ...)]
  105. 可以指定如下参数:
  106. timeout 指定导入操作的超时时间。默认超时为4小时。单位秒。
  107. max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。
  108. exec_mem_limit 导入内存限制。默认为 2GB。单位为字节。
  109. strict mode 是否对数据进行严格限制。默认为 false
  110. timezone: 指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 "Asia/Shanghai" 时区。
  111. 5. 导入数据格式样例
  112. 整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
  113. 浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
  114. 日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03
  115. (注:如果是其他日期格式,可以在导入命令中,使用 strftime 或者 time_format 函数进行转换)
  116. 字符串类(CHAR/VARCHAR):"I am a student", "a"
  117. NULL值:\N

example

  1. 1. HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker broker。简单认证。
  2. LOAD LABEL example_db.label1
  3. (
  4. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  5. INTO TABLE `my_table`
  6. )
  7. WITH BROKER my_hdfs_broker
  8. (
  9. "username" = "hdfs_user",
  10. "password" = "hdfs_passwd"
  11. )
  12. PROPERTIES
  13. (
  14. "timeout" = "3600",
  15. "max_filter_ratio" = "0.1"
  16. );
  17. 其中 hdfs_host namenode hosthdfs_port fs.defaultFS 端口(默认9000
  18. 2. AFS 一批数据,包含多个文件。导入不同的 table,指定分隔符,指定列对应关系。
  19. LOAD LABEL example_db.label2
  20. (
  21. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file1")
  22. INTO TABLE `my_table_1`
  23. COLUMNS TERMINATED BY ","
  24. (k1, k3, k2, v1, v2),
  25. DATA INFILE("afs://afs_host:hdfs_port/user/palo/data/input/file2")
  26. INTO TABLE `my_table_2`
  27. COLUMNS TERMINATED BY "\t"
  28. (k1, k2, k3, v2, v1)
  29. )
  30. WITH BROKER my_afs_broker
  31. (
  32. "username" = "afs_user",
  33. "password" = "afs_passwd"
  34. )
  35. PROPERTIES
  36. (
  37. "timeout" = "3600",
  38. "max_filter_ratio" = "0.1"
  39. );
  40. 3. HDFS 导入一批数据,指定hive的默认分隔符\x01,并使用通配符*指定目录下的所有文件。
  41. 使用简单认证,同时配置 namenode HA
  42. LOAD LABEL example_db.label3
  43. (
  44. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
  45. INTO TABLE `my_table`
  46. COLUMNS TERMINATED BY "\\x01"
  47. )
  48. WITH BROKER my_hdfs_broker
  49. (
  50. "username" = "hdfs_user",
  51. "password" = "hdfs_passwd",
  52. "dfs.nameservices" = "my_ha",
  53. "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
  54. "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
  55. "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
  56. "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  57. )
  58. 4. HDFS 导入一批“负”数据。同时使用 kerberos 认证方式。提供 keytab 文件路径。
  59. LOAD LABEL example_db.label4
  60. (
  61. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
  62. NEGATIVE
  63. INTO TABLE `my_table`
  64. COLUMNS TERMINATED BY "\t"
  65. )
  66. WITH BROKER my_hdfs_broker
  67. (
  68. "hadoop.security.authentication" = "kerberos",
  69. "kerberos_principal"="doris@YOUR.COM",
  70. "kerberos_keytab"="/home/palo/palo.keytab"
  71. )
  72. 5. 从 HDFS 导入一批数据,指定分区。同时使用 kerberos 认证方式。提供 base64 编码后的 keytab 文件内容。
  73. LOAD LABEL example_db.label5
  74. (
  75. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  76. INTO TABLE `my_table`
  77. PARTITION (p1, p2)
  78. COLUMNS TERMINATED BY ","
  79. (k1, k3, k2, v1, v2)
  80. )
  81. WITH BROKER my_hdfs_broker
  82. (
  83. "hadoop.security.authentication"="kerberos",
  84. "kerberos_principal"="doris@YOUR.COM",
  85. "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
  86. )
  87. 6. BOS 导入一批数据,指定分区, 并对导入文件的列做一些转化,如下:
  88. 表结构为:
  89. k1 varchar(20)
  90. k2 int
  91. 假设数据文件只有一行数据:
  92. Adele,1,1
  93. 数据文件中各列,对应导入语句中指定的各列:
  94. k1,tmp_k2,tmp_k3
  95. 转换如下:
  96. 1) k1: 不变换
  97. 2) k2:是 tmp_k2 tmp_k3 数据之和
  98. LOAD LABEL example_db.label6
  99. (
  100. DATA INFILE("bos://my_bucket/input/file")
  101. INTO TABLE `my_table`
  102. PARTITION (p1, p2)
  103. COLUMNS TERMINATED BY ","
  104. (k1, tmp_k2, tmp_k3)
  105. SET (
  106. k2 = tmp_k2 + tmp_k3
  107. )
  108. )
  109. WITH BROKER my_bos_broker
  110. (
  111. "bos_endpoint" = "http://bj.bcebos.com",
  112. "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
  113. "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
  114. )
  115. 7. 导入数据到含有HLL列的表,可以是表中的列或者数据里面的列
  116. 如果表中有4列分别是(id,v1,v2,v3)。其中v1v2列是hll列。导入的源文件有3列, 其中表中的第一列 = 源文件中的第一列,而表中的第二,三列为源文件中的第二,三列变换得到,表中的第四列在源文件中并不存在。
  117. 则(column_list)中声明第一列为id,第二三列为一个临时命名的k1,k2
  118. SET中必须给表中的hll列特殊声明 hll_hash。表中的v1列等于原始数据中的hll_hash(k1)列, 表中的v3列在原始数据中并没有对应的值,使用empty_hll补充默认值。
  119. LOAD LABEL example_db.label7
  120. (
  121. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  122. INTO TABLE `my_table`
  123. PARTITION (p1, p2)
  124. COLUMNS TERMINATED BY ","
  125. (id, k1, k2)
  126. SET (
  127. v1 = hll_hash(k1),
  128. v2 = hll_hash(k2),
  129. v3 = empty_hll()
  130. )
  131. )
  132. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  133. LOAD LABEL example_db.label8
  134. (
  135. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  136. INTO TABLE `my_table`
  137. PARTITION (p1, p2)
  138. COLUMNS TERMINATED BY ","
  139. (k1, k2, tmp_k3, tmp_k4, v1, v2)
  140. SET (
  141. v1 = hll_hash(tmp_k3),
  142. v2 = hll_hash(tmp_k4)
  143. )
  144. )
  145. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  146. 8. 导入Parquet文件中数据 指定FORMAT parquet 默认是通过文件后缀判断
  147. LOAD LABEL example_db.label9
  148. (
  149. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  150. INTO TABLE `my_table`
  151. FORMAT AS "parquet"
  152. (k1, k2, k3)
  153. )
  154. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  155. 9. 提取文件路径中的分区字段
  156. 如果需要,则会根据表中定义的字段类型解析文件路径中的分区字段(partitioned fields),类似SparkPartition Discovery的功能
  157. LOAD LABEL example_db.label10
  158. (
  159. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*")
  160. INTO TABLE `my_table`
  161. FORMAT AS "csv"
  162. (k1, k2, k3)
  163. COLUMNS FROM PATH AS (city, utc_date)
  164. SET (uniq_id = md5sum(k1, city))
  165. )
  166. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  167. hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:
  168. [hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]
  169. 则提取文件路径的中的cityutc_date字段
  170. 10. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
  171. LOAD LABEL example_db.label10
  172. (
  173. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  174. INTO TABLE `my_table`
  175. where k1 > k2
  176. )
  177. 11. AmazonS3 导入Parquet文件中数据,指定 FORMAT parquet,默认是通过文件后缀判断:
  178. LOAD LABEL example_db.label11
  179. (
  180. DATA INFILE("s3a://my_bucket/input/file")
  181. INTO TABLE `my_table`
  182. FORMAT AS "parquet"
  183. (k1, k2, k3)
  184. )
  185. WITH BROKER my_s3a_broker
  186. (
  187. "fs.s3a.access.key" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
  188. "fs.s3a.secret.key" = "yyyyyyyyyyyyyyyyyyyy",
  189. "fs.s3a.endpoint" = "s3.amazonaws.com"
  190. )
  191. 12. 提取文件路径中的时间分区字段,并且时间包含 %3A (在 hdfs 路径中,不允许有 ':',所有 ':' 会由 %3A 替换)
  192. 假设有如下文件:
  193. /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
  194. /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
  195. 表结构为:
  196. data_time DATETIME,
  197. k2 INT,
  198. k3 INT
  199. LOAD LABEL example_db.label12
  200. (
  201. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  202. INTO TABLE `tbl12`
  203. COLUMNS TERMINATED BY ","
  204. (k2,k3)
  205. COLUMNS FROM PATH AS (data_time)
  206. SET (data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s'))
  207. )
  208. WITH BROKER "hdfs" ("username"="user", "password"="pass");
  209. 13. HDFS 导入一批数据,指定超时时间和过滤比例。使用明文 my_hdfs_broker broker。简单认证。并且将原有数据中与 导入数据中v2 大于100 的列相匹配的列删除,其他列正常导入
  210. LOAD LABEL example_db.label1
  211. (
  212. MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  213. INTO TABLE `my_table`
  214. COLUMNS TERMINATED BY "\t"
  215. (k1, k2, k3, v2, v1)
  216. )
  217. DELETE ON v2 >100
  218. WITH BROKER my_hdfs_broker
  219. (
  220. "username" = "hdfs_user",
  221. "password" = "hdfs_passwd"
  222. )
  223. PROPERTIES
  224. (
  225. "timeout" = "3600",
  226. "max_filter_ratio" = "0.1"
  227. );
  228. 14. 导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序:
  229. LOAD LABEL example_db.label_sequence
  230. (
  231. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  232. INTO TABLE `tbl1`
  233. COLUMNS TERMINATED BY ","
  234. (k1,k2,source_sequence,v1,v2)
  235. ORDER BY source_sequence
  236. )
  237. with BROKER "hdfs" ("username"="user", "password"="pass");
  238. 14. 先过滤原始数据,在进行列的映射、转换和过滤操作
  239. LOAD LABEL example_db.label_filter
  240. (
  241. DATA INFILE("hdfs://host:port/user/data/*/test.txt")
  242. INTO TABLE `tbl1`
  243. COLUMNS TERMINATED BY ","
  244. (k1,k2,v1,v2)
  245. PRECEDING FILTER k1 > 2
  246. SET (k1 = k1 +1)
  247. WHERE k1 > 3
  248. )
  249. with BROKER "hdfs" ("username"="user", "password"="pass");

keyword

  1. BROKER,LOAD