SeaTunnel

最新版本的 Apache SeaTunnel (原 waterdrop )Seatunnel Connector Spark Doris - 图1 (opens new window) 已经支持 Doris 的连接器, SeaTunnel 可以用过 Spark 引擎和 Flink 引擎同步数据至 Doris 中.

事实上, SeaTunnel 通过 Stream Load 方式同步数据,性能强劲,欢迎大家使用

安装 SeaTunnel SeaTunnel 安装链接Seatunnel Connector Spark Doris - 图2 (opens new window)

Spark Sink Doris

插件代码

Spark Sink Doris 的插件代码在这里Seatunnel Connector Spark Doris - 图3 (opens new window)

参数列表

参数名参数类型是否必要默认值引擎类型
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris Fe节点地址:8030

database [string]

写入 Doris 的库名

table [string]

写入 Doris 的表名

user [string]

Doris 访问用户

password [string]

Doris 访问用户密码

batch_size [string]

Spark 通过 Stream Load 方式写入,每个批次提交条数

doris. [string]

Stream Load 方式写入的 Http 参数优化,在官网参数前加上’Doris.’前缀

更多 Stream Load 参数配置Seatunnel Connector Spark Doris - 图4 (opens new window)

Examples

Hive 迁移数据至 Doris

  1. env{
  2. spark.app.name = "hive2doris-template"
  3. }
  4. spark {
  5. spark.sql.catalogImplementation = "hive"
  6. }
  7. source {
  8. hive {
  9. preSql = "select * from tmp.test"
  10. result_table_name = "test"
  11. }
  12. }
  13. transform {
  14. }
  15. sink {
  16. Console {
  17. }
  18. Doris {
  19. fenodes="xxxx:8030"
  20. database="tmp"
  21. table="test"
  22. user="root"
  23. password="root"
  24. batch_size=1000
  25. doris.column_separator="\t"
  26. doris.columns="date_key,date_value,day_in_year,day_in_month"
  27. }
  28. }

启动命令

  1. sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf