Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
代码库地址:https://github.com/apache/incubator-doris-spark-connector
- 支持从
Doris
中读取数据 - 支持
Spark DataFrame
批量/流式 写入Doris
- 可以将
Doris
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
Doris
端完成数据过滤,减少数据传输量。
版本兼容
Connector | Spark | Doris | Java | Scala |
---|---|---|---|---|
2.3.4-2.11.xx | 2.x | 0.12+ | 8 | 2.11 |
3.1.2-2.12.xx | 3.x | 0.12.+ | 8 | 2.12 |
3.2.0-2.12.xx | 3.2.x | 0.12.+ | 8 | 2.12 |
编译与安装
准备工作
1.修改custom_env.sh.tpl
文件,重命名为custom_env.sh
2.指定thrift安装目录
##源文件内容
#export THRIFT_BIN=
#export MVN_BIN=
#export JAVA_HOME=
##修改如下,MacOS为例
export THRIFT_BIN=/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift
#export MVN_BIN=
#export JAVA_HOME=
安装 `thrift` 0.13.0 版本(注意:`Doris` 0.15 和最新的版本基于 `thrift` 0.13.0 构建, 之前的版本依然使用`thrift` 0.9.3 构建)
Windows:
1.下载:`http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe`(下载目录自己指定)
2.修改thrift-0.13.0.exe 为 thrift
MacOS:
1. 下载:`brew install thrift@0.13.0`
2. 默认下载地址:/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift
注:MacOS执行 `brew install thrift@0.13.0` 可能会报找不到版本的错误,解决方法如下,在终端执行:
1. `brew tap-new $USER/local-tap`
2. `brew extract --version='0.13.0' thrift $USER/local-tap`
3. `brew install thrift@0.13.0`
参考链接: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c`
Linux:
1.下载源码包:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz`
2.安装依赖:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++`
3.`tar zxvf thrift-0.13.0.tar.gz`
4.`cd thrift-0.13.0`
5.`./configure --without-tests`
6.`make`
7.`make install`
安装完成后查看版本:thrift --version
注:如果编译过Doris,则不需要安装thrift,可以直接使用 $DORIS_HOME/thirdparty/installed/bin/thrift
在源码目录下执行:
sh build.sh --spark 2.3.4 --scala 2.11 ## spark 2.3.4, scala 2.11
sh build.sh --spark 3.1.2 --scala 2.12 ## spark 3.1.2, scala 2.12
sh build.sh --spark 3.2.0 --scala 2.12 \
--mvn-args "-Dnetty.version=4.1.68.Final -Dfasterxml.jackson.version=2.12.3" ## spark 3.2.0, scala 2.12
注:如果你是从 tag 检出的源码,则可以直接执行
sh build.sh --tag
,而无需指定 spark 和 scala 的版本。因为 tag 源码中的版本是固定的。
编译成功后,会在 output/
目录下生成文件 doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar
。将此文件复制到 Spark
的 ClassPath
中即可使用 Spark-Doris-Connector
。例如,Local
模式运行的 Spark
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的Spark
,则将此文件放入预部署包中。
例如将 doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar
上传到 hdfs并在spark.yarn.jars参数上添加 hdfs上的Jar包路径
- 上传doris-spark-connector-3.1.2-2.12-1.0.0.jar 到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/doris-spark-connector-3.1.2-2.12-1.0.0.jar /spark-jars/
- 在集群中添加doris-spark-connector-3.1.2-2.12-1.0.0.jar 依赖。
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar
使用Maven管理
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.1_2.12</artifactId>
<!--artifactId>spark-doris-connector-2.3_2.11</artifactId-->
<version>1.0.1</version>
</dependency>
注意
请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。
使用示例
读取
SQL
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);
SELECT * FROM spark_doris;
DataFrame
val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
dorisSparkDF.show(5)
RDD
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)
dorisSparkRDD.collect()
pySpark
dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
# show 5 lines data
dorisSparkDF.show(5)
写入
SQL
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);
INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
DataFrame(batch/stream)
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)
mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()
## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()
java示例
samples/doris-demo/spark-demo/
下提供了 Java 版本的示例,可供参考,这里
配置
通用配置项
Key | Default Value | Comment |
---|---|---|
doris.fenodes | — | Doris FE http 地址,支持多个地址,使用逗号分隔 |
doris.table.identifier | — | Doris 表名,如:db1.tbl1 |
doris.request.retries | 3 | 向Doris发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
doris.write.fields | — | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。 默认写入时要按照Doris表字段顺序写入全部字段。 |
sink.batch.size | 10000 | 单次写BE的最大行数 |
sink.max-retries | 1 | 写BE失败之后的重试次数 |
sink.properties.* | — | Stream Load 的导入参数。 例如: ‘sink.properties.column_separator’ = ‘, ‘ |
doris.sink.task.partition.size | — | Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。 此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 |
doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。 如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 |
SQL 和 Dataframe 专有配置
Key | Default Value | Comment |
---|---|---|
user | — | 访问Doris的用户名 |
password | — | 访问Doris的密码 |
doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
RDD 专有配置
Key | Default Value | Comment |
---|---|---|
doris.request.auth.user | — | 访问Doris的用户名 |
doris.request.auth.password | — | 访问Doris的密码 |
doris.read.field | — | 读取Doris表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | — | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
Doris 和 Spark 列类型映射关系
Doris Type | Spark Type |
---|---|
NULL_TYPE | DataTypes.NullType |
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DATE | DataTypes.StringType1 |
DATETIME | DataTypes.StringType1 |
BINARY | DataTypes.BinaryType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
LARGEINT | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DECIMALV2 | DecimalType |
TIME | DataTypes.DoubleType |
HLL | Unsupported datatype |
- 注:Connector中,将
DATE
和DATETIME
映射为String
。由于Doris
底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用String
类型直接返回对应的时间可读文本。