Spark Doris Connector

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过 Spark 写入数据到 Doris。

代码库地址:https://github.com/apache/doris-spark-connector

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

版本兼容

ConnectorSparkDorisJavaScala
1.3.13.4 ~ 3.1, 2.4, 2.31.0 +82.12, 2.11
1.2.03.2, 3.1, 2.31.0 +82.12, 2.11
1.1.03.2, 3.1, 2.31.0 +82.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

编译与安装

准备工作

  1. 修改custom_env.sh.tpl文件,重命名为custom_env.sh

  2. 在源码目录下执行: sh build.sh 根据提示输入你需要的 Scala 与 Spark 版本进行编译。

编译成功后,会在 dist 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 SparkClassPath 中即可使用 Spark-Doris-Connector

例如,Local 模式运行的 Spark,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。

例如将 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 上传到 hdfs 并在 spark.yarn.jars 参数上添加 hdfs 上的 Jar 包路径

  1. 上传 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 到 hdfs。
  1. hdfs dfs -mkdir /spark-jars/
  2. hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
  1. 在集群中添加 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 依赖。
  1. spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar

使用 Maven 管理

  1. <dependency>
  2. <groupId>org.apache.doris</groupId>
  3. <artifactId>spark-doris-connector-3.4_2.12</artifactId>
  4. <version>1.3.0</version>
  5. </dependency>

注意

请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

使用示例

读取

SQL

  1. CREATE
  2. TEMPORARY VIEW spark_doris
  3. USING doris
  4. OPTIONS(
  5. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  6. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  7. "user"="$YOUR_DORIS_USERNAME",
  8. "password"="$YOUR_DORIS_PASSWORD"
  9. );
  10. SELECT *
  11. FROM spark_doris;

DataFrame

  1. val dorisSparkDF = spark.read.format("doris")
  2. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  3. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  4. .option("user", "$YOUR_DORIS_USERNAME")
  5. .option("password", "$YOUR_DORIS_PASSWORD")
  6. .load()
  7. dorisSparkDF.show(5)

RDD

  1. import org.apache.doris.spark._
  2. val dorisSparkRDD = sc.dorisRDD(
  3. tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
  4. cfg = Some(Map(
  5. "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  6. "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
  7. "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
  8. ))
  9. )
  10. dorisSparkRDD.collect()

pySpark

  1. dorisSparkDF = spark.read.format("doris")
  2. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  3. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  4. .option("user", "$YOUR_DORIS_USERNAME")
  5. .option("password", "$YOUR_DORIS_PASSWORD")
  6. .load()
  7. // show 5 lines data
  8. dorisSparkDF.show(5)

写入

SQL

  1. CREATE
  2. TEMPORARY VIEW spark_doris
  3. USING doris
  4. OPTIONS(
  5. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  6. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  7. "user"="$YOUR_DORIS_USERNAME",
  8. "password"="$YOUR_DORIS_PASSWORD"
  9. );
  10. INSERT INTO spark_doris
  11. VALUES ("VALUE1", "VALUE2", ...);
  12. # or
  13. INSERT INTO spark_doris
  14. SELECT *
  15. FROM YOUR_TABLE
  16. # or
  17. INSERT OVERWRITE
  18. SELECT *
  19. FROM YOUR_TABLE

DataFrame(batch/stream)

  1. ## batch sink
  2. val mockDataDF = List(
  3. (3, "440403001005", "21.cn"),
  4. (1, "4404030013005", "22.cn"),
  5. (33, null, "23.cn")
  6. ).toDF("id", "mi_code", "mi_name")
  7. mockDataDF.show(5)
  8. mockDataDF.write.format("doris")
  9. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  10. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  11. .option("user", "$YOUR_DORIS_USERNAME")
  12. .option("password", "$YOUR_DORIS_PASSWORD")
  13. //其它选项
  14. //指定你要写入的字段
  15. .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
  16. // 支持设置 Overwrite 模式来覆盖数据
  17. // .option("save_mode", SaveMode.Overwrite)
  18. .save()
  19. ## stream sink(StructuredStreaming)
  20. ### 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。
  21. val sourceDf = spark.readStream.
  22. .format("your_own_stream_source")
  23. .load()
  24. val resultDf = sourceDf.<transformations>
  25. resultDf.writeStream
  26. .format("doris")
  27. .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  28. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  29. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  30. .option("user", "$YOUR_DORIS_USERNAME")
  31. .option("password", "$YOUR_DORIS_PASSWORD")
  32. .start()
  33. .awaitTermination()
  34. ### 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值
  35. val kafkaSource = spark.readStream
  36. .format("kafka")
  37. .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  38. .option("startingOffsets", "latest")
  39. .option("subscribe", "$YOUR_KAFKA_TOPICS")
  40. .load()
  41. kafkaSource.selectExpr("CAST(value as STRING)")
  42. .writeStream
  43. .format("doris")
  44. .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  45. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
  46. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  47. .option("user", "$YOUR_DORIS_USERNAME")
  48. .option("password", "$YOUR_DORIS_PASSWORD")
  49. // 设置该选项可以将 Kafka 消息中的 value 列不经过处理直接写入
  50. .option("doris.sink.streaming.passthrough", "true")
  51. .option("doris.sink.properties.format", "json")
  52. // 其他选项
  53. .start()
  54. .awaitTermination()

Java 示例

samples/doris-demo/spark-demo/ 下提供了 Java 版本的示例,可供参考,这里

配置

通用配置项

KeyDefault ValueComment
doris.fenodesDoris FE http 地址,支持多个地址,使用逗号分隔
doris.table.identifierDoris 表名,如:db1.tbl1
doris.request.retries3向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms30000向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms30000向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s3600查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.read.field读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.batch.size1024一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size64异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.write.fields指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。
doris.sink.batch.size100000单次写 BE 的最大行数
doris.sink.max-retries0写 BE 失败之后的重试次数
doris.sink.properties.formatcsvStream Load 的数据格式。
共支持 3 种格式:csv,json,arrow(1.4.0 版本开始支持)
更多参数详情
doris.sink.properties.*Stream Load 的导入参数。
例如:
指定列分隔符:‘doris.sink.properties.column_separator’ = ‘,’
更多参数详情
doris.sink.task.partition.sizeDoris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。
doris.sink.task.use.repartitionfalse是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。
doris.sink.batch.interval.ms50每个批次 sink 的间隔时间,单位 ms。
doris.sink.enable-2pcfalse是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。
doris.sink.auto-redirecttrue是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。

SQL 和 Dataframe 专有配置

KeyDefault ValueComment
user访问 Doris 的用户名
password访问 Doris 的密码
doris.filter.query.in.max.count100谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。
doris.ignore-type指在定临时视图中,读取 schema 时要忽略的字段类型。
例如,’doris.ignore-type’=’bitmap,hll’

Structured Streaming 专有配置

KeyDefault ValueComment
doris.sink.streaming.passthroughfalse将第一列的值不经过处理直接写入。

RDD 专有配置

KeyDefault ValueComment
doris.request.auth.user访问 Doris 的用户名
doris.request.auth.password访问 Doris 的密码
doris.filter.query过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。

Spark Doris Connector - 图1提示

  1. 在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 BITMAPHLL 类型的数据时,需要设置参数 doris.ignore-type 为对应类型,并通过 doris.write.fields 对列进行映射转换,使用方式如下:

    BITMAP

    1. CREATE TEMPORARY VIEW spark_doris
    2. USING doris
    3. OPTIONS(
    4. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    5. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    6. "user"="$YOUR_DORIS_USERNAME",
    7. "password"="$YOUR_DORIS_PASSWORD"
    8. "doris.ignore-type"="bitmap",
    9. "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
    10. );

    HLL

    1. CREATE TEMPORARY VIEW spark_doris
    2. USING doris
    3. OPTIONS(
    4. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    5. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    6. "user"="$YOUR_DORIS_USERNAME",
    7. "password"="$YOUR_DORIS_PASSWORD"
    8. "doris.ignore-type"="hll",
    9. "doris.write.fields"="col1,hll_col1=hll_hash(col1)"
    10. );
  2. 从 1.3.0 版本开始, doris.sink.max-retries 配置项的默认值为 0,即默认不进行重试。 当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 doris.sink.batch.size 所配置大小的数据,可能需要适当增大内存分配。

  3. 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下

    DataFrame

    1. resultDf.format("doris")
    2. .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    3. // your own options
    4. .option("save_mode", SaveMode.Overwrite)
    5. .save()

    SQL

    1. INSERT OVERWRITE your_target_table
    2. SELECT * FROM your_source_table

Doris 和 Spark 列类型映射关系

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.StringType1
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
BitmapUnsupported datatype
  • 注:Connector 中,将DATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。