Flink Doris Connector

  • Flink Doris Connector can support data stored in Doris through Flink operations (read, insert, modify, delete). This document introduces how to operate Doris through Datastream and SQL through Flink.

Note:

  1. Modification and deletion are only supported on the Unique Key model
  2. The current deletion is to support Flink CDC to access data to achieve automatic deletion. If it is to delete other data access methods, you need to implement it yourself. For the data deletion usage of Flink CDC, please refer to the last section of this document

Version Compatibility

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11+0.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8-
1.3.01.161.0+8-
1.4.01.15,1.16,1.171.0+8-

USE

Maven

Add flink-doris-connector

  1. <!-- flink-doris-connector -->
  2. <dependency>
  3. <groupId>org.apache.doris</groupId>
  4. <artifactId>flink-doris-connector-1.16</artifactId>
  5. <version>1.4.0</version>
  6. </dependency>

Remark

  1. Please replace the corresponding Connector and Flink dependent versions according to different Flink versions.

  2. You can also download the relevant version jar package from here.

compile

When compiling, you can run sh build.sh directly. For details, please refer to here.

After the compilation is successful, the target jar package will be generated in the dist directory, such as: flink-doris-connector-1.5.0-SNAPSHOT.jar. Copy this file to classpath of Flink to use Flink-Doris-Connector. For example, Flink running in Local mode, put this file in the lib/ folder. Flink running in Yarn cluster mode, put this file into the pre-deployment package.

Instructions

read

SQL

  1. -- doris source
  2. CREATE TABLE flink_doris_source (
  3. name STRING,
  4. age INT,
  5. price DECIMAL(5,2),
  6. sale DOUBLE
  7. )
  8. WITH (
  9. 'connector' = 'doris',
  10. 'fenodes' = 'FE_IP:HTTP_PORT',
  11. 'table.identifier' = 'database.table',
  12. 'username' = 'root',
  13. 'password' = 'password'
  14. );

DataStream

  1. DorisOptions.Builder builder = DorisOptions.builder()
  2. .setFenodes("FE_IP:HTTP_PORT")
  3. .setTableIdentifier("db.table")
  4. .setUsername("root")
  5. .setPassword("password");
  6. DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
  7. .setDorisOptions(builder.build())
  8. .setDorisReadOptions(DorisReadOptions.builder().build())
  9. .setDeserializer(new SimpleListDeserializationSchema())
  10. .build();
  11. env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

write

SQL

  1. --enable checkpoint
  2. SET 'execution.checkpointing.interval' = '10s';
  3. -- doris sink
  4. CREATE TABLE flink_doris_sink (
  5. name STRING,
  6. age INT,
  7. price DECIMAL(5,2),
  8. sale DOUBLE
  9. )
  10. WITH (
  11. 'connector' = 'doris',
  12. 'fenodes' = 'FE_IP:HTTP_PORT',
  13. 'table.identifier' = 'db.table',
  14. 'username' = 'root',
  15. 'password' = 'password',
  16. 'sink.label-prefix' = 'doris_label'
  17. );
  18. -- submit insert job
  19. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

DorisSink writes data to Doris through StreamLoad, and DataStream supports different serialization methods when writing

String data stream (SimpleStringSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. // using batch mode for bounded data
  4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  5. DorisSink.Builder<String> builder = DorisSink.builder();
  6. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  7. dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
  8. .setTableIdentifier("db.table")
  9. .setUsername("root")
  10. .setPassword("password");
  11. Properties properties = new Properties();
  12. // When the upstream is writing json, the configuration needs to be enabled.
  13. //properties.setProperty("format", "json");
  14. //properties.setProperty("read_json_by_line", "true");
  15. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  16. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
  17. .setDeletable(false)
  18. .setStreamLoadProp(properties); ;
  19. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  20. .setDorisExecutionOptions(executionBuilder.build())
  21. .setSerializer(new SimpleStringSerializer()) //serialize according to string
  22. .setDorisOptions(dorisBuilder.build());
  23. //mock string source
  24. List<Tuple2<String, Integer>> data = new ArrayList<>();
  25. data.add(new Tuple2<>("doris",1));
  26. DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);
  27. source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
  28. .sinkTo(builder.build());
  29. //mock json string source
  30. //env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());

RowData data stream (RowDataSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. // using batch mode for bounded data
  4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  5. //doris sink option
  6. DorisSink.Builder<RowData> builder = DorisSink.builder();
  7. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  8. dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
  9. .setTableIdentifier("db.table")
  10. .setUsername("root")
  11. .setPassword("password");
  12. // json format to streamload
  13. Properties properties = new Properties();
  14. properties.setProperty("format", "json");
  15. properties.setProperty("read_json_by_line", "true");
  16. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  17. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
  18. .setDeletable(false)
  19. .setStreamLoadProp(properties); //streamload params
  20. //flink rowdata's schema
  21. String[] fields = {"city", "longitude", "latitude", "destroy_date"};
  22. DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};
  23. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  24. .setDorisExecutionOptions(executionBuilder.build())
  25. .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
  26. .setFieldNames(fields)
  27. .setType("json") //json format
  28. .setFieldType(types).build())
  29. .setDorisOptions(dorisBuilder.build());
  30. //mock rowdata source
  31. DataStream<RowData> source = env. fromElements("")
  32. .map(new MapFunction<String, RowData>() {
  33. @Override
  34. public RowData map(String value) throws Exception {
  35. GenericRowData genericRowData = new GenericRowData(4);
  36. genericRowData.setField(0, StringData.fromString("beijing"));
  37. genericRowData.setField(1, 116.405419);
  38. genericRowData.setField(2, 39.916927);
  39. genericRowData.setField(3, LocalDate.now().toEpochDay());
  40. return genericRowData;
  41. }
  42. });
  43. source. sinkTo(builder. build());

SchemaChange data stream (JsonDebeziumSchemaSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. Properties props = new Properties();
  4. props. setProperty("format", "json");
  5. props.setProperty("read_json_by_line", "true");
  6. DorisOptions dorisOptions = DorisOptions. builder()
  7. .setFenodes("127.0.0.1:8030")
  8. .setTableIdentifier("test.t1")
  9. .setUsername("root")
  10. .setPassword("").build();
  11. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  12. executionBuilder.setLabelPrefix("label-prefix")
  13. .setStreamLoadProp(props).setDeletable(true);
  14. DorisSink.Builder<String> builder = DorisSink.builder();
  15. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  16. .setDorisExecutionOptions(executionBuilder.build())
  17. .setDorisOptions(dorisOptions)
  18. .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
  19. env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  20. .sinkTo(builder.build());

Reference: CDCSchemaChangeExample

Lookup Join

  1. CREATE TABLE fact_table (
  2. `id` BIGINT,
  3. `name` STRING,
  4. `city` STRING,
  5. `process_time` as proctime()
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. ...
  9. );
  10. create table dim_city(
  11. `city` STRING,
  12. `level` INT ,
  13. `province` STRING,
  14. `country` STRING
  15. ) WITH (
  16. 'connector' = 'doris',
  17. 'fenodes' = '127.0.0.1:8030',
  18. 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
  19. 'table.identifier' = 'dim.dim_city',
  20. 'username' = 'root',
  21. 'password' = ''
  22. );
  23. SELECT a.id, a.name, a.city, c.province, c.country,c.level
  24. FROM fact_table a
  25. LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
  26. ON a.city = c.city

configuration

General configuration items

KeyDefault ValueRequiredComment
fenodesYDoris FE http address, multiple addresses are supported, separated by commas
benodesNDoris BE http address, multiple addresses are supported, separated by commas. refer to #187
table.identifierYDoris table name, such as: db.tbl
usernameYusername to access Doris
passwordYPassword to access Doris
doris.request.retries3NNumber of retries to send requests to Doris
doris.request.connect.timeout.ms30000NConnection timeout for sending requests to Doris
doris.request.read.timeout.ms30000NRead timeout for sending requests to Doris

Source configuration item

KeyDefault ValueRequiredComment
doris.request.query.timeout.s3600NThe timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit
doris.request.tablet.sizeInteger. MAX_VALUENThe number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.
doris.batch.size1024NThe maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.
doris.exec.mem.limit2147483648NMemory limit for a single query. The default is 2GB, in bytes
doris.deserialize.arrow.asyncFALSENWhether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations
doris.deserialize.queue.size64NAsynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true
doris.read.fieldNRead the list of column names of the Doris table, separated by commas
doris.filter.queryNThe expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18.

Sink configuration items

KeyDefault ValueRequiredComment
sink.label-prefixYThe label prefix used by Stream load import. In the 2pc scenario, global uniqueness is required to ensure Flink’s EOS semantics.
sink.properties.*NImport parameters for Stream Load.
For example: ‘sink.properties.column_separator’ = ‘, ‘ defines column delimiters, ‘sink.properties.escape_delimiters’ = ‘true’ special characters as delimiters, ‘\x01’ will be converted to binary 0x01

JSON format import
‘sink.properties.format’ = ‘json’ ‘sink.properties. read_json_by_line’ = ‘true’
Detailed parameters refer to here.
sink.enable-deleteTRUENWhether to enable delete. This option requires the Doris table to enable the batch delete function (Doris 0.15+ version is enabled by default), and only supports the Unique model.
sink.enable-2pcTRUENWhether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.
sink.buffer-size1MBNThe size of the write data cache buffer, in bytes. It is not recommended to modify, the default configuration is enough
sink.buffer-count3NThe number of write data buffers. It is not recommended to modify, the default configuration is enough
sink.max-retries3NMaximum number of retries after Commit failure, default 3

Lookup Join configuration item

KeyDefault ValueRequiredComment
jdbc-urlYjdbc connection information
lookup.cache.max-rows-1NThe maximum number of rows in the lookup cache, the default value is -1, and the cache is not enabled
lookup.cache.ttl10sNThe maximum time of lookup cache, the default is 10s
lookup.max-retries1NThe number of retries after a lookup query fails
lookup.jdbc.asyncfalseNWhether to enable asynchronous lookup, the default is false
lookup.jdbc.read.batch.size128NUnder asynchronous lookup, the maximum batch size for each query
lookup.jdbc.read.batch.queue-size256NThe size of the intermediate buffer queue during asynchronous lookup
lookup.jdbc.read.thread-size3NThe number of jdbc threads for lookup in each task
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype
  1. SET 'execution.checkpointing.interval' = '10s';
  2. CREATE TABLE cdc_mysql_source (
  3. id int
  4. ,name VARCHAR
  5. ,PRIMARY KEY (id) NOT ENFORCED
  6. ) WITH (
  7. 'connector' = 'mysql-cdc',
  8. 'hostname' = '127.0.0.1',
  9. 'port' = '3306',
  10. 'username' = 'root',
  11. 'password' = 'password',
  12. 'database-name' = 'database',
  13. 'table-name' = 'table'
  14. );
  15. -- Support synchronous insert/update/delete events
  16. CREATE TABLE doris_sink (
  17. id INT,
  18. name STRING
  19. )
  20. WITH (
  21. 'connector' = 'doris',
  22. 'fenodes' = '127.0.0.1:8030',
  23. 'table.identifier' = 'database.table',
  24. 'username' = 'root',
  25. 'password' = '',
  26. 'sink.properties.format' = 'json',
  27. 'sink.properties.read_json_by_line' = 'true',
  28. 'sink.enable-delete' = 'true', -- Synchronize delete events
  29. 'sink.label-prefix' = 'doris_label'
  30. );
  31. insert into doris_sink select id,name from cdc_mysql_source;

Use FlinkCDC to access multi-table or whole database example

grammar

  1. <FLINK_HOME>bin/flink run \
  2. -c org.apache.doris.flink.tools.cdc.CdcTools \
  3. lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\
  4. <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
  5. --database <doris-database-name> \
  6. [--job-name <flink-job-name>] \
  7. [--table-prefix <doris-table-prefix>] \
  8. [--table-suffix <doris-table-suffix>] \
  9. [--including-tables <mysql-table-name|name-regular-expr>] \
  10. [--excluding-tables <mysql-table-name|name-regular-expr>] \
  11. --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
  12. --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
  13. --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
  14. [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
  • --job-name Flink job name, not required.
  • --database Synchronize to the database name of Doris.
  • --table-prefix Doris table prefix name, for example —table-prefix ods_.
  • --table-suffix Same as above, the suffix name of the Doris table.
  • --including-tables MySQL tables that need to be synchronized, you can use “|” to separate multiple tables, and support regular expressions. For example —including-tables table1|tbl.* is to synchronize table1 and all tables beginning with tbl.
  • --excluding-tables Tables that do not need to be synchronized, the usage is the same as above.
  • --mysql-conf MySQL CDCSource configuration, eg —mysql-conf hostname=127.0.0.1 , you can see all configuration MySQL-CDC in here, where hostname/username/password/database-name is required.
  • --oracle-conf Oracle CDCSource configuration, for example —oracle-conf hostname=127.0.0.1, you can view all configurations of Oracle-CDC in here, where hostname/username/password/database-name/schema-name is required.
  • --sink-conf All configurations of Doris Sink, you can view the complete configuration items in here.
  • --table-conf The configuration item of the Doris table, that is, the content contained in properties. For example —table-conf replication_num=1
  • --ignore-default-value Turn off the default for synchronizing mysql table structures. It is suitable for synchronizing mysql data to doris, the field has a default value, but the actual inserted data is null. refer to#152
  • --use-new-schema-change The new schema change supports synchronous mysql multi-column changes and default values. refer to#167

Note: When synchronizing, you need to add the corresponding Flink CDC dependencies in the $FLINK_HOME/lib directory, such as flink-sql-connector-mysql-cdc-${version}.jar, flink-sql-connector-oracle-cdc-${version}.jar

MySQL synchronization example

  1. <FLINK_HOME>bin/flink run \
  2. -Dexecution.checkpointing.interval=10s\
  3. -Dparallelism.default=1\
  4. -c org.apache.doris.flink.tools.cdc.CdcTools\
  5. lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. mysql-sync-database\
  7. --database test_db \
  8. --mysql-conf hostname=127.0.0.1 \
  9. --mysql-conf port=3306 \
  10. --mysql-conf username=root \
  11. --mysql-conf password=123456 \
  12. --mysql-conf database-name=mysql_db \
  13. --including-tables "tbl1|test.*" \
  14. --sink-conf fenodes=127.0.0.1:8030 \
  15. --sink-conf username=root \
  16. --sink-conf password=123456 \
  17. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  18. --sink-conf sink.label-prefix=label \
  19. --table-conf replication_num=1

Oracle synchronization example

  1. <FLINK_HOME>bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1 \
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
  6. oracle-sync-database \
  7. --database test_db \
  8. --oracle-conf hostname=127.0.0.1 \
  9. --oracle-conf port=1521 \
  10. --oracle-conf username=admin \
  11. --oracle-conf password="password" \
  12. --oracle-conf database-name=XE \
  13. --oracle-conf schema-name=ADMIN \
  14. --including-tables "tbl1|tbl2" \
  15. --sink-conf fenodes=127.0.0.1:8030 \
  16. --sink-conf username=root \
  17. --sink-conf password=\
  18. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  19. --sink-conf sink.label-prefix=label \
  20. --table-conf replication_num=1

PostgreSQL synchronization example

  1. <FLINK_HOME>/bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1\
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. postgres-sync-database \
  7. --database db1\
  8. --postgres-conf hostname=127.0.0.1 \
  9. --postgres-conf port=5432 \
  10. --postgres-conf username=postgres \
  11. --postgres-conf password="123456" \
  12. --postgres-conf database-name=postgres \
  13. --postgres-conf schema-name=public \
  14. --postgres-conf slot.name=test \
  15. --postgres-conf decoding.plugin.name=pgoutput \
  16. --including-tables "tbl1|tbl2" \
  17. --sink-conf fenodes=127.0.0.1:8030 \
  18. --sink-conf username=root \
  19. --sink-conf password=\
  20. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  21. --sink-conf sink.label-prefix=label \
  22. --table-conf replication_num=1

SQLServer synchronization example

  1. <FLINK_HOME>/bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1 \
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. sqlserver-sync-database \
  7. --database db1\
  8. --sqlserver-conf hostname=127.0.0.1 \
  9. --sqlserver-conf port=1433 \
  10. --sqlserver-conf username=sa \
  11. --sqlserver-conf password="123456" \
  12. --sqlserver-conf database-name=CDC_DB \
  13. --sqlserver-conf schema-name=dbo \
  14. --including-tables "tbl1|tbl2" \
  15. --sink-conf fenodes=127.0.0.1:8030 \
  16. --sink-conf username=root \
  17. --sink-conf password=\
  18. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  19. --sink-conf sink.label-prefix=label \
  20. --table-conf replication_num=1

Use FlinkCDC to update Key column

Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change. In this scenario, using FlinkCDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column.

Principle

The underlying collection tool of Flink CDC is Debezium. Debezium internally uses the op field to identify the corresponding operation: the values of the op field are c, u, d, and r, corresponding to create, update, delete, and read. For the update of the primary key column, FlinkCDC will send DELETE and INSERT events downstream, and after the data is synchronized to Doris, it will automatically update the data of the primary key column.

Example

The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (update student set id = '1002' where id = '1001') on the MySQL side to modify the data in Doris .

Generally, messages in Kafka use specific fields to mark the operation type, such as {“op_type”:”delete”,data:{…}}. For this type of data, it is hoped that the data with op_type=delete will be deleted.

By default, DorisSink will distinguish the type of event based on RowKind. Usually, in the case of cdc, the event type can be obtained directly, and the hidden column __DORIS_DELETE_SIGN__ is assigned to achieve the purpose of deletion, while Kafka needs to be based on business logic. Judgment, display the value passed in to the hidden column.

Example

  1. -- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
  2. CREATE TABLE KAFKA_SOURCE(
  3. data STRING,
  4. op_type STRING
  5. ) WITH (
  6. 'connector' = 'kafka',
  7. ...
  8. );
  9. CREATE TABLE DORIS_SINK(
  10. id INT,
  11. name STRING,
  12. __DORIS_DELETE_SIGN__ INT
  13. ) WITH (
  14. 'connector' = 'doris',
  15. 'fenodes' = '127.0.0.1:8030',
  16. 'table.identifier' = 'db.table',
  17. 'username' = 'root',
  18. 'password' = '',
  19. 'sink.enable-delete' = 'false', -- false means not to get the event type from RowKind
  20. 'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload
  21. );
  22. INSERT INTO DORIS_SINK
  23. SELECT json_value(data,'$.id') as id,
  24. json_value(data,'$.name') as name,
  25. if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
  26. from KAFKA_SOURCE;

Java example

samples/doris-demo/ An example of the Java version is provided below for reference, see here

Best Practices

Application scenarios

The most suitable scenario for using Flink Doris Connector is to synchronize source data to Doris (Mysql, Oracle, PostgreSQL) in real time/batch, etc., and use Flink to perform joint analysis on data in Doris and other data sources. You can also use Flink Doris Connector

Other

  1. The Flink Doris Connector mainly relies on Checkpoint for streaming writing, so the interval between Checkpoints is the visible delay time of the data.
  2. To ensure the Exactly Once semantics of Flink, the Flink Doris Connector enables two-phase commit by default, and Doris enables two-phase commit by default after version 1.1. 1.0 can be enabled by modifying the BE parameters, please refer to two_phase_commit.

FAQ

  1. After Doris Source finishes reading data, why does the stream end?

Currently Doris Source is a bounded stream and does not support CDC reading.

  1. Can Flink read Doris and perform conditional pushdown?

By configuring the doris.filter.query parameter, refer to the configuration section for details.

  1. How to write Bitmap type?
  1. CREATE TABLE bitmap_sink (
  2. dt int,
  3. page string,
  4. user_id int
  5. )
  6. WITH (
  7. 'connector' = 'doris',
  8. 'fenodes' = '127.0.0.1:8030',
  9. 'table.identifier' = 'test.bitmap_test',
  10. 'username' = 'root',
  11. 'password' = '',
  12. 'sink.label-prefix' = 'doris_label',
  13. 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
  14. )
  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

In the Exactly-Once scenario, the Flink Job must be restarted from the latest Checkpoint/Savepoint, otherwise the above error will be reported. When Exactly-Once is not required, it can also be solved by turning off 2PC commits (sink.enable-2pc=false) or changing to a different sink.label-prefix.

  1. errCode = 2, detailMessage = transaction [19650] not found

Occurred in the Commit phase, the transaction ID recorded in the checkpoint has expired on the FE side, and the above error will occur when committing again at this time. At this time, it cannot be started from the checkpoint, and the expiration time can be extended by modifying the streaming_label_keep_max_second configuration in fe.conf, which defaults to 12 hours.

  1. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

This is because the concurrent import of the same library exceeds 100, which can be solved by adjusting the parameter max_running_txn_num_per_db of fe.conf. For details, please refer to max_running_txn_num_per_db

At the same time, if a task frequently modifies the label and restarts, it may also cause this error. In the 2pc scenario (Duplicate/Aggregate model), the label of each task needs to be unique, and when restarting from the checkpoint, the Flink task will actively abort the txn that has been successfully precommitted before and has not been committed. Frequently modifying the label and restarting will cause a large number of txn that have successfully precommitted to fail to be aborted, occupying the transaction. Under the Unique model, 2pc can also be turned off, which can realize idempotent writing.

  1. How to ensure the order of a batch of data when Flink writes to the Uniq model?

You can add sequence column configuration to ensure that, for details, please refer to sequence

  1. The Flink task does not report an error, but the data cannot be synchronized?

Before Connector1.1.0, it was written in batches, and the writing was driven by data. It was necessary to determine whether there was data written upstream. After 1.1.0, it depends on Checkpoint, and Checkpoint must be enabled to write.

  1. tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

It usually occurs before Connector1.1.0, because the writing frequency is too fast, resulting in too many versions. The frequency of Streamload can be reduced by setting the sink.batch.size and sink.batch.interval parameters.

  1. Flink imports dirty data, how to skip it?

When Flink imports data, if there is dirty data, such as field format, length, etc., it will cause StreamLoad to report an error, and Flink will continue to retry at this time. If you need to skip, you can disable the strict mode of StreamLoad (strict_mode=false, max_filter_ratio=1) or filter the data before the Sink operator.

  1. How should the source table and Doris table correspond? When using Flink Connector to import data, pay attention to two aspects. The first is that the columns and types of the source table correspond to the columns and types in flink sql; the second is that the columns and types in flink sql must match those of the doris table For the correspondence between columns and types, please refer to the above “Doris & Flink Column Type Mapping” for details

  2. TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

This is due to concurrency bugs in the Thrift. It is recommended that you use the latest connector and compatible Flink version possible.

  1. DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc

You can search for the log abort transaction response in TaskManager and determine whether it is a client issue or a server issue based on the HTTP return code.

  1. org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx

This problem is mainly caused by the conditional varchar/string type, which needs to be quoted. The correct way to write it is xxx = ‘’xxx’’. In this way, the Flink SQL parser will interpret two consecutive single quotes as one single quote character instead of The end of the string, and the concatenated string is used as the value of the attribute.