About SeaTunnel

SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day.

Connector-V2

The connector-v2 for SeaTunnel supports Doris Sink since version 2.3.1 and supports exactly-once write and CDC data synchronization

Plugin Code

SeaTunnel Doris Sink Plugin Code

Options

nametyperequireddefault value
fenodesstringyes-
usernamestringyes-
passwordstringyes-
table.identifierstringyes-
sink.label-prefixstringyes-
sink.enable-2pcboolnotrue
sink.enable-deleteboolnofalse
doris.configmapyes-

fenodes [string]

Doris cluster FE Nodes address, the format is "fe_ip:fe_http_port, ..."

username [string]

Doris user username

password [string]

Doris`user password

table.identifier [string]

The name of Doris table,The format is DBName.TableName

sink.label-prefix [string]

The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.

sink.enable-2pc [bool]

Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.

sink.enable-delete [bool]

Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:

batch delete

doris.config [map]

The parameter of the stream load data_desc, you can get more detail at this link:

More Stream Load parameters

Example

Use JSON format to import data

  1. sink {
  2. Doris {
  3. fenodes = "doris_fe:8030"
  4. username = root
  5. password = ""
  6. table.identifier = "test.table_sink"
  7. sink.enable-2pc = "true"
  8. sink.label-prefix = "test_json"
  9. doris.config = {
  10. format="json"
  11. read_json_by_line="true"
  12. }
  13. }
  14. }

Use CSV format to import data

  1. sink {
  2. Doris {
  3. fenodes = "doris_fe:8030"
  4. username = root
  5. password = ""
  6. table.identifier = "test.table_sink"
  7. sink.enable-2pc = "true"
  8. sink.label-prefix = "test_csv"
  9. doris.config = {
  10. format = "csv"
  11. column_separator = ","
  12. }
  13. }
  14. }

Connector-V1

Plugin Code

Seatunnel Flink Sink Doris plugin code

Options

nametyperequireddefault valueengine
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe http url, eg: 127.0.0.1:8030

database [string]

Doris database

table [string]

Doris table

user [string]

Doris user

password [string]

Doris password

batch_size [int]

The maximum number of lines to write to Doris at a time, the default value is 100

interval [int]

The flush interval (in milliseconds), after which the asynchronous thread writes the data in the cache to Doris. Set to 0 to turn off periodic writes.

max_retries [int]

Number of retries after writing to Doris fails

doris.* [string]

Import parameters for Stream load. For example: ‘doris.column_separator’ = ‘, ‘ etc.

More Stream Load parameter configuration

Examples

Socket To Doris

  1. env {
  2. execution.parallelism = 1
  3. }
  4. source {
  5. SocketStream {
  6. host = 127.0.0.1
  7. port = 9999
  8. result_table_name = "socket"
  9. field_name = "info"
  10. }
  11. }
  12. transform {
  13. }
  14. sink {
  15. DorisSink {
  16. fenodes = "127.0.0.1:8030"
  17. user = root
  18. password = 123456
  19. database = test
  20. table = test_tbl
  21. batch_size = 5
  22. max_retries = 1
  23. interval = 5000
  24. }
  25. }

Start command

  1. sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Spark Sink Doris

Plugin Code

Seatunnel Spark Sink Doris plugin code

Options

nametyperequireddefault valueengine
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris FE address:8030

database [string]

Doris target database name

table [string]

Doris target table name

user [string]

Doris user name

password [string]

Doris user’s password

batch_size [string]

Doris number of submissions per batch

doris. [string] Doris stream_load properties,you can use ‘doris.’ prefix + stream_load properties

More Doris stream_load Configurations

Examples

Hive to Doris

Config properties

  1. env{
  2. spark.app.name = "hive2doris-template"
  3. }
  4. spark {
  5. spark.sql.catalogImplementation = "hive"
  6. }
  7. source {
  8. hive {
  9. preSql = "select * from tmp.test"
  10. result_table_name = "test"
  11. }
  12. }
  13. transform {
  14. }
  15. sink {
  16. Console {
  17. }
  18. Doris {
  19. fenodes="xxxx:8030"
  20. database="gl_mint_dim"
  21. table="dim_date"
  22. user="root"
  23. password="root"
  24. batch_size=1000
  25. doris.column_separator="\t"
  26. doris.columns="date_key,date_value,day_in_year,day_in_month"
  27. }
  28. }

Start command

  1. sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf