Flink Doris Connector

  • The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink.

Github: https://github.com/apache/incubator-doris-flink-connector

  • Doris table can be mapped to DataStream or Table.

Note:

  1. Modification and deletion are only supported on the Unique Key model
  2. The current deletion is to support Flink CDC to access data to achieve automatic deletion. If it is to delete other data access methods, you need to implement it yourself. For the data deletion usage of Flink CDC, please refer to the last section of this document

Version Compatibility

ConnectorFlinkDorisJavaScala
1.11.6-2.12-xx1.11.x0.13+82.12
1.12.7-2.12-xx1.12.x0.13.+82.12
1.13.5-2.12-xx1.13.x0.13.+82.12

Build and Install

Execute following command in source dir:

  1. sh build.sh --flink 1.11.6 --scala 2.12 # flink 1.11.6 scala 2.12

Note: If you check out the source code from tag, you can just run sh build.sh --tag without specifying the flink and scala versions. This is because the version in the tag source code is fixed. For example, 1.13.5-2.12-1.0.1 means flink version 1.13.5, scala version 2.12, and connector version 1.0.1.

After successful compilation, the file doris-flink-1.13.5-2.12-1.0.1-SNAPSHOT.jar will be generated in the output/ directory. Copy this file to ClassPath in Flink to use Flink-Doris-Connector. For example, Flink running in Local mode, put this file in the jars/ folder. Flink running in Yarn cluster mode, put this file in the pre-deployment package.

Remarks:

  1. Doris FE should be configured to enable http v2 in the configuration
  2. Scala version currently only supports 2.12.x version

conf/fe.conf

  1. enable_http_server_v2 = true

Using Maven

Add Dependency

  1. <dependency>
  2. <groupId>org.apache.doris</groupId>
  3. <artifactId>doris-flink-connector</artifactId>
  4. <version>1.11.6-2.12-SNAPSHOT</version>
  5. </dependency>

Remarks

1.11.6 can be substitute with 1.12.7 or 1.13.5 base on flink version you are using

How to use

There are three ways to use Flink Doris Connector.

  • SQL
  • DataStream
  • DataSet

Parameters Configuration

Flink Doris Connector Sink writes data to Doris by the Stream load, and also supports the configurations of Stream load

  • SQL configured by sink.properties. in the WITH
  • DataStream configured by DorisExecutionOptions.builder().setStreamLoadProp(Properties)

SQL

  • Source
  1. CREATE TABLE flink_doris_source (
  2. name STRING,
  3. age INT,
  4. price DECIMAL(5,2),
  5. sale DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'doris',
  9. 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
  10. 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
  11. 'username' = '$YOUR_DORIS_USERNAME',
  12. 'password' = '$YOUR_DORIS_PASSWORD'
  13. );
  • Sink
  1. CREATE TABLE flink_doris_sink (
  2. name STRING,
  3. age INT,
  4. price DECIMAL(5,2),
  5. sale DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'doris',
  9. 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
  10. 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
  11. 'username' = '$YOUR_DORIS_USERNAME',
  12. 'password' = '$YOUR_DORIS_PASSWORD'
  13. );
  • Insert
  1. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

  • Source
  1. Properties properties = new Properties();
  2. properties.put("fenodes","FE_IP:8030");
  3. properties.put("username","root");
  4. properties.put("password","");
  5. properties.put("table.identifier","db.table");
  6. env.addSource(new DorisSourceFunction(
  7. new DorisStreamOptions(properties),
  8. new SimpleListDeserializationSchema()
  9. )
  10. ).print();
  • Sink

Json Stream

  1. Properties pro = new Properties();
  2. pro.setProperty("format", "json");
  3. pro.setProperty("strip_outer_array", "true");
  4. env.fromElements(
  5. "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
  6. )
  7. .addSink(
  8. DorisSink.sink(
  9. DorisReadOptions.builder().build(),
  10. DorisExecutionOptions.builder()
  11. .setBatchSize(3)
  12. .setBatchIntervalMs(0l)
  13. .setMaxRetries(3)
  14. .setStreamLoadProp(pro).build(),
  15. DorisOptions.builder()
  16. .setFenodes("FE_IP:8030")
  17. .setTableIdentifier("db.table")
  18. .setUsername("root")
  19. .setPassword("").build()
  20. ));

Json Stream

  1. env.fromElements(
  2. "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
  3. )
  4. .addSink(
  5. DorisSink.sink(
  6. DorisOptions.builder()
  7. .setFenodes("FE_IP:8030")
  8. .setTableIdentifier("db.table")
  9. .setUsername("root")
  10. .setPassword("").build()
  11. ));

RowData Stream

  1. DataStream<RowData> source = env.fromElements("")
  2. .map(new MapFunction<String, RowData>() {
  3. @Override
  4. public RowData map(String value) throws Exception {
  5. GenericRowData genericRowData = new GenericRowData(3);
  6. genericRowData.setField(0, StringData.fromString("北京"));
  7. genericRowData.setField(1, 116.405419);
  8. genericRowData.setField(2, 39.916927);
  9. return genericRowData;
  10. }
  11. });
  12. String[] fields = {"city", "longitude", "latitude"};
  13. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
  14. source.addSink(
  15. DorisSink.sink(
  16. fields,
  17. types,
  18. DorisReadOptions.builder().build(),
  19. DorisExecutionOptions.builder()
  20. .setBatchSize(3)
  21. .setBatchIntervalMs(0L)
  22. .setMaxRetries(3)
  23. .build(),
  24. DorisOptions.builder()
  25. .setFenodes("FE_IP:8030")
  26. .setTableIdentifier("db.table")
  27. .setUsername("root")
  28. .setPassword("").build()
  29. ));

DataSet

  • Sink
  1. MapOperator<String, RowData> data = env.fromElements("")
  2. .map(new MapFunction<String, RowData>() {
  3. @Override
  4. public RowData map(String value) throws Exception {
  5. GenericRowData genericRowData = new GenericRowData(3);
  6. genericRowData.setField(0, StringData.fromString("北京"));
  7. genericRowData.setField(1, 116.405419);
  8. genericRowData.setField(2, 39.916927);
  9. return genericRowData;
  10. }
  11. });
  12. DorisOptions dorisOptions = DorisOptions.builder()
  13. .setFenodes("FE_IP:8030")
  14. .setTableIdentifier("db.table")
  15. .setUsername("root")
  16. .setPassword("").build();
  17. DorisReadOptions readOptions = DorisReadOptions.defaults();
  18. DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
  19. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
  20. String[] fields = {"city", "longitude", "latitude"};
  21. DorisDynamicOutputFormat outputFormat = new DorisDynamicOutputFormat(
  22. dorisOptions, readOptions, executionOptions, types, fields
  23. );
  24. outputFormat.open(0, 1);
  25. data.output(outputFormat);
  26. outputFormat.close();

General

KeyDefault ValueComment
fenodesDoris FE http address, support multiple addresses, separated by commas
table.identifierDoris table identifier, eg, db1.tbl1
usernameDoris username
passwordDoris password
doris.request.retries3Number of retries to send requests to Doris
doris.request.connect.timeout.ms30000Connection timeout for sending requests to Doris
doris.request.read.timeout.ms30000Read timeout for sending request to Doris
doris.request.query.timeout.s3600Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit
doris.request.tablet.sizeInteger.MAX_VALUEThe number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.
doris.batch.size1024The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris. Thereby reducing the extra time overhead caused by network delay.
doris.exec.mem.limit2147483648Memory limit for a single query. The default is 2GB, in bytes.
doris.deserialize.arrow.asyncfalseWhether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration
doris.deserialize.queue.size64Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true
doris.read.fieldList of column names in the Doris table, separated by commas
doris.filter.queryFilter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.
sink.batch.size10000Maximum number of lines in a single write BE
sink.max-retries1Number of retries after writing BE failed
sink.batch.interval10sThe flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 10 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing.
sink.properties.*The stream load parameters.

eg:
sink.properties.column_separator’ = ‘,’

Setting ‘sink.properties.escape_delimiters’ = ‘true’ if you want to use a control char as a separator, so that such as ‘\x01’ will translate to binary 0x01

Support JSON format import, you need to enable both ‘sink.properties.format’ =’json’ and ‘sink.properties.strip_outer_array’ =’true’
sink.enable-deletetrueWhether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype
  1. CREATE TABLE cdc_mysql_source (
  2. id int
  3. ,name VARCHAR
  4. ,PRIMARY KEY (id) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = '127.0.0.1',
  8. 'port' = '3306',
  9. 'username' = 'root',
  10. 'password' = 'password',
  11. 'database-name' = 'database',
  12. 'table-name' = 'table'
  13. );
  14. -- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
  15. CREATE TABLE doris_sink (
  16. id INT,
  17. name STRING
  18. )
  19. WITH (
  20. 'connector' = 'doris',
  21. 'fenodes' = '127.0.0.1:8030',
  22. 'table.identifier' = 'database.table',
  23. 'username' = 'root',
  24. 'password' = '',
  25. 'sink.properties.format' = 'json',
  26. 'sink.properties.strip_outer_array' = 'true',
  27. 'sink.enable-delete' = 'true'
  28. );
  29. insert into doris_sink select id,name from cdc_mysql_source;