Flink Doris Connector

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。

注意:

  1. 修改和删除只支持在 Unique Key 模型上
  2. 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。Flink CDC 的数据删除使用方式参照本文档最后一节

版本兼容

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11+0.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8-
1.3.01.161.0+8-
1.4.01.15,1.16,1.171.0+8-

使用

Maven

添加 flink-doris-connector

  1. <!-- flink-doris-connector -->
  2. <dependency>
  3. <groupId>org.apache.doris</groupId>
  4. <artifactId>flink-doris-connector-1.16</artifactId>
  5. <version>1.4.0</version>
  6. </dependency>

备注

1.请根据不同的 Flink 版本替换对应的 Connector 和 Flink 依赖版本。

2.也可从这里下载相关版本jar包。

编译

编译时,可直接运行sh build.sh,具体可参考这里

编译成功后,会在 dist 目录生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。 将此文件复制到 Flinkclasspath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。 Yarn 集群模式运行的 Flink ,则将此文件放入预部署包中。

使用方法

读取

SQL

  1. -- doris source
  2. CREATE TABLE flink_doris_source (
  3. name STRING,
  4. age INT,
  5. price DECIMAL(5,2),
  6. sale DOUBLE
  7. )
  8. WITH (
  9. 'connector' = 'doris',
  10. 'fenodes' = 'FE_IP:HTTP_PORT',
  11. 'table.identifier' = 'database.table',
  12. 'username' = 'root',
  13. 'password' = 'password'
  14. );

DataStream

  1. DorisOptions.Builder builder = DorisOptions.builder()
  2. .setFenodes("FE_IP:HTTP_PORT")
  3. .setTableIdentifier("db.table")
  4. .setUsername("root")
  5. .setPassword("password");
  6. DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
  7. .setDorisOptions(builder.build())
  8. .setDorisReadOptions(DorisReadOptions.builder().build())
  9. .setDeserializer(new SimpleListDeserializationSchema())
  10. .build();
  11. env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

写入

SQL

  1. -- enable checkpoint
  2. SET 'execution.checkpointing.interval' = '10s';
  3. -- doris sink
  4. CREATE TABLE flink_doris_sink (
  5. name STRING,
  6. age INT,
  7. price DECIMAL(5,2),
  8. sale DOUBLE
  9. )
  10. WITH (
  11. 'connector' = 'doris',
  12. 'fenodes' = 'FE_IP:HTTP_PORT',
  13. 'table.identifier' = 'db.table',
  14. 'username' = 'root',
  15. 'password' = 'password',
  16. 'sink.label-prefix' = 'doris_label'
  17. );
  18. -- submit insert job
  19. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

DorisSink是通过StreamLoad向Doris写入数据,DataStream写入时,支持不同的序列化方法

String 数据流(SimpleStringSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. // using batch mode for bounded data
  4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  5. DorisSink.Builder<String> builder = DorisSink.builder();
  6. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  7. dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
  8. .setTableIdentifier("db.table")
  9. .setUsername("root")
  10. .setPassword("password");
  11. Properties properties = new Properties();
  12. // 上游是json写入时,需要开启配置
  13. //properties.setProperty("format", "json");
  14. //properties.setProperty("read_json_by_line", "true");
  15. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  16. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
  17. .setDeletable(false)
  18. .setStreamLoadProp(properties);
  19. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  20. .setDorisExecutionOptions(executionBuilder.build())
  21. .setSerializer(new SimpleStringSerializer()) //serialize according to string
  22. .setDorisOptions(dorisBuilder.build());
  23. //mock csv string source
  24. List<Tuple2<String, Integer>> data = new ArrayList<>();
  25. data.add(new Tuple2<>("doris",1));
  26. DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
  27. source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
  28. .sinkTo(builder.build());
  29. //mock json string source
  30. //env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());

RowData 数据流(RowDataSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. // using batch mode for bounded data
  4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  5. //doris sink option
  6. DorisSink.Builder<RowData> builder = DorisSink.builder();
  7. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  8. dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
  9. .setTableIdentifier("db.table")
  10. .setUsername("root")
  11. .setPassword("password");
  12. // json format to streamload
  13. Properties properties = new Properties();
  14. properties.setProperty("format", "json");
  15. properties.setProperty("read_json_by_line", "true");
  16. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  17. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
  18. .setDeletable(false)
  19. .setStreamLoadProp(properties); //streamload params
  20. //flink rowdata‘s schema
  21. String[] fields = {"city", "longitude", "latitude", "destroy_date"};
  22. DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};
  23. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  24. .setDorisExecutionOptions(executionBuilder.build())
  25. .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
  26. .setFieldNames(fields)
  27. .setType("json") //json format
  28. .setFieldType(types).build())
  29. .setDorisOptions(dorisBuilder.build());
  30. //mock rowdata source
  31. DataStream<RowData> source = env.fromElements("")
  32. .map(new MapFunction<String, RowData>() {
  33. @Override
  34. public RowData map(String value) throws Exception {
  35. GenericRowData genericRowData = new GenericRowData(4);
  36. genericRowData.setField(0, StringData.fromString("beijing"));
  37. genericRowData.setField(1, 116.405419);
  38. genericRowData.setField(2, 39.916927);
  39. genericRowData.setField(3, LocalDate.now().toEpochDay());
  40. return genericRowData;
  41. }
  42. });
  43. source.sinkTo(builder.build());

SchemaChange 数据流(JsonDebeziumSchemaSerializer)

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. Properties props = new Properties();
  4. props.setProperty("format", "json");
  5. props.setProperty("read_json_by_line", "true");
  6. DorisOptions dorisOptions = DorisOptions.builder()
  7. .setFenodes("127.0.0.1:8030")
  8. .setTableIdentifier("test.t1")
  9. .setUsername("root")
  10. .setPassword("").build();
  11. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  12. executionBuilder.setLabelPrefix("label-prefix")
  13. .setStreamLoadProp(props).setDeletable(true);
  14. DorisSink.Builder<String> builder = DorisSink.builder();
  15. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  16. .setDorisExecutionOptions(executionBuilder.build())
  17. .setDorisOptions(dorisOptions)
  18. .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
  19. env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  20. .sinkTo(builder.build());

参考: CDCSchemaChangeExample

Lookup Join

  1. CREATE TABLE fact_table (
  2. `id` BIGINT,
  3. `name` STRING,
  4. `city` STRING,
  5. `process_time` as proctime()
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. ...
  9. );
  10. create table dim_city(
  11. `city` STRING,
  12. `level` INT ,
  13. `province` STRING,
  14. `country` STRING
  15. ) WITH (
  16. 'connector' = 'doris',
  17. 'fenodes' = '127.0.0.1:8030',
  18. 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
  19. 'table.identifier' = 'dim.dim_city',
  20. 'username' = 'root',
  21. 'password' = ''
  22. );
  23. SELECT a.id, a.name, a.city, c.province, c.country,c.level
  24. FROM fact_table a
  25. LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
  26. ON a.city = c.city

配置

通用配置项

KeyDefault ValueRequiredComment
fenodesYDoris FE http 地址, 支持多个地址,使用逗号分隔
benodesNDoris BE http 地址, 支持多个地址,使用逗号分隔,参考#187
table.identifierYDoris 表名,如:db.tbl
usernameY访问 Doris 的用户名
passwordY访问 Doris 的密码
doris.request.retries3N向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms30000N向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms30000N向 Doris 发送请求的读取超时时间

Source 配置项

KeyDefault ValueRequiredComment
doris.request.query.timeout.s3600N查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger. MAX_VALUEN一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size1024N一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648N单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncFALSEN是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size64N异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.fieldN读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.queryN过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。比如 age=18。

Sink 配置项

KeyDefault ValueRequiredComment
sink.label-prefixYStream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。
sink.properties.*NStream Load 的导入参数。
例如: ‘sink.properties.column_separator’ = ‘, ‘ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符,’\x01’会被转换为二进制的0x01

JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.read_json_by_line’ = ‘true’
详细参数参考这里
sink.enable-deleteTRUEN是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。
sink.enable-2pcTRUEN是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里
sink.buffer-size1MBN写数据缓存buffer大小,单位字节。不建议修改,默认配置即可
sink.buffer-count3N写数据缓存buffer个数。不建议修改,默认配置即可
sink.max-retries3NCommit失败后的最大重试次数,默认3次

Lookup Join 配置项

KeyDefault ValueRequiredComment
jdbc-urlYjdbc连接信息
lookup.cache.max-rows-1Nlookup缓存的最大行数,默认值-1,不开启缓存
lookup.cache.ttl10sNlookup缓存的最大时间,默认10s
lookup.max-retries1Nlookup查询失败后的重试次数
lookup.jdbc.asyncfalseN是否开启异步的lookup,默认false
lookup.jdbc.read.batch.size128N异步lookup下,每次查询的最大批次大小
lookup.jdbc.read.batch.queue-size256N异步lookup时,中间缓冲队列的大小
lookup.jdbc.read.thread-size3N每个task中lookup的jdbc线程数
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

使用FlinkSQL通过CDC接入Doris示例

  1. -- enable checkpoint
  2. SET 'execution.checkpointing.interval' = '10s';
  3. CREATE TABLE cdc_mysql_source (
  4. id int
  5. ,name VARCHAR
  6. ,PRIMARY KEY (id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = '127.0.0.1',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = 'password',
  13. 'database-name' = 'database',
  14. 'table-name' = 'table'
  15. );
  16. -- 支持同步insert/update/delete事件
  17. CREATE TABLE doris_sink (
  18. id INT,
  19. name STRING
  20. )
  21. WITH (
  22. 'connector' = 'doris',
  23. 'fenodes' = '127.0.0.1:8030',
  24. 'table.identifier' = 'database.table',
  25. 'username' = 'root',
  26. 'password' = '',
  27. 'sink.properties.format' = 'json',
  28. 'sink.properties.read_json_by_line' = 'true',
  29. 'sink.enable-delete' = 'true', -- 同步删除事件
  30. 'sink.label-prefix' = 'doris_label'
  31. );
  32. insert into doris_sink select id,name from cdc_mysql_source;

使用FlinkCDC接入多表或整库示例

语法

  1. <FLINK_HOME>bin/flink run \
  2. -c org.apache.doris.flink.tools.cdc.CdcTools \
  3. lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
  4. <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
  5. --database <doris-database-name> \
  6. [--job-name <flink-job-name>] \
  7. [--table-prefix <doris-table-prefix>] \
  8. [--table-suffix <doris-table-suffix>] \
  9. [--including-tables <mysql-table-name|name-regular-expr>] \
  10. [--excluding-tables <mysql-table-name|name-regular-expr>] \
  11. --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
  12. --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
  13. --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
  14. [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
  • --job-name Flink任务名称, 非必需。
  • --database 同步到Doris的数据库名。
  • --table-prefix Doris表前缀名,例如 —table-prefix ods_。
  • --table-suffix 同上,Doris表的后缀名。
  • --including-tables 需要同步的MySQL表,可以使用”|” 分隔多个表,并支持正则表达式。 比如—including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
  • --excluding-tables 不需要同步的表,用法同上。
  • --mysql-conf MySQL CDCSource 配置,例如—mysql-conf hostname=127.0.0.1 ,您可以在这里查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。
  • --oracle-conf Oracle CDCSource 配置,例如—oracle-conf hostname=127.0.0.1 ,您可以在这里查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。
  • --sink-conf Doris Sink 的所有配置,可以在这里查看完整的配置项。
  • --table-conf Doris表的配置项,即properties中包含的内容。 例如 —table-conf replication_num=1
  • --ignore-default-value 关闭同步mysql表结构的默认值。适用于同步mysql数据到doris时,字段有默认值,但实际插入数据为null情况。参考#152
  • --use-new-schema-change 新的schema change支持同步mysql多列变更、默认值。参考#167

注:同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar

MySQL同步示例

  1. <FLINK_HOME>bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1 \
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
  6. mysql-sync-database \
  7. --database test_db \
  8. --mysql-conf hostname=127.0.0.1 \
  9. --mysql-conf port=3306 \
  10. --mysql-conf username=root \
  11. --mysql-conf password=123456 \
  12. --mysql-conf database-name=mysql_db \
  13. --including-tables "tbl1|test.*" \
  14. --sink-conf fenodes=127.0.0.1:8030 \
  15. --sink-conf username=root \
  16. --sink-conf password=123456 \
  17. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  18. --sink-conf sink.label-prefix=label \
  19. --table-conf replication_num=1

Oracle同步示例

  1. <FLINK_HOME>bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1 \
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. oracle-sync-database \
  7. --database test_db \
  8. --oracle-conf hostname=127.0.0.1 \
  9. --oracle-conf port=1521 \
  10. --oracle-conf username=admin \
  11. --oracle-conf password="password" \
  12. --oracle-conf database-name=XE \
  13. --oracle-conf schema-name=ADMIN \
  14. --including-tables "tbl1|tbl2" \
  15. --sink-conf fenodes=127.0.0.1:8030 \
  16. --sink-conf username=root \
  17. --sink-conf password=\
  18. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  19. --sink-conf sink.label-prefix=label \
  20. --table-conf replication_num=1

PostgreSQL同步示例

  1. <FLINK_HOME>/bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1\
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. postgres-sync-database \
  7. --database db1\
  8. --postgres-conf hostname=127.0.0.1 \
  9. --postgres-conf port=5432 \
  10. --postgres-conf username=postgres \
  11. --postgres-conf password="123456" \
  12. --postgres-conf database-name=postgres \
  13. --postgres-conf schema-name=public \
  14. --postgres-conf slot.name=test \
  15. --postgres-conf decoding.plugin.name=pgoutput \
  16. --including-tables "tbl1|tbl2" \
  17. --sink-conf fenodes=127.0.0.1:8030 \
  18. --sink-conf username=root \
  19. --sink-conf password=\
  20. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  21. --sink-conf sink.label-prefix=label \
  22. --table-conf replication_num=1

SQLServer同步示例

  1. <FLINK_HOME>/bin/flink run \
  2. -Dexecution.checkpointing.interval=10s \
  3. -Dparallelism.default=1 \
  4. -c org.apache.doris.flink.tools.cdc.CdcTools \
  5. ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
  6. sqlserver-sync-database \
  7. --database db1\
  8. --sqlserver-conf hostname=127.0.0.1 \
  9. --sqlserver-conf port=1433 \
  10. --sqlserver-conf username=sa \
  11. --sqlserver-conf password="123456" \
  12. --sqlserver-conf database-name=CDC_DB \
  13. --sqlserver-conf schema-name=dbo \
  14. --including-tables "tbl1|tbl2" \
  15. --sink-conf fenodes=127.0.0.1:8030 \
  16. --sink-conf username=root \
  17. --sink-conf password=\
  18. --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
  19. --sink-conf sink.label-prefix=label \
  20. --table-conf replication_num=1

使用FlinkCDC更新Key列

一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。

原理

Flink CDC底层的采集工具是Debezium,Debezium内部使用op字段来标识对应的操作:op字段的取值分别为c、u、d、r,分别对应create、update、delete和read。 而对于主键列的更新,FlinkCDC会向下游发送DELETE和INSERT事件,同时数据同步到Doris中后,就会自动更新主键列的数据。

使用

Flink程序可参考上面CDC同步的示例,成功提交任务后,在MySQL侧执行Update主键列的语句(update student set id = '1002' where id = '1001'),即可修改Doris中的数据。

一般Kafka中的消息会使用特定字段来标记操作类型,比如{“op_type”:”delete”,data:{…}}。针对这类数据,希望将op_type=delete的数据删除掉。

DorisSink默认会根据RowKind来区分事件的类型,通常这种在cdc情况下可以直接获取到事件类型,对隐藏列__DORIS_DELETE_SIGN__进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。

使用

  1. -- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
  2. CREATE TABLE KAFKA_SOURCE(
  3. data STRING,
  4. op_type STRING
  5. ) WITH (
  6. 'connector' = 'kafka',
  7. ...
  8. );
  9. CREATE TABLE DORIS_SINK(
  10. id INT,
  11. name STRING,
  12. __DORIS_DELETE_SIGN__ INT
  13. ) WITH (
  14. 'connector' = 'doris',
  15. 'fenodes' = '127.0.0.1:8030',
  16. 'table.identifier' = 'db.table',
  17. 'username' = 'root',
  18. 'password' = '',
  19. 'sink.enable-delete' = 'false', -- false表示不从RowKind获取事件类型
  20. 'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- 显示指定streamload的导入列
  21. );
  22. INSERT INTO DORIS_SINK
  23. SELECT json_value(data,'$.id') as id,
  24. json_value(data,'$.name') as name,
  25. if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
  26. from KAFKA_SOURCE;

Java示例

samples/doris-demo/ 下提供了 Java 版本的示例,可供参考,查看点击这里

最佳实践

应用场景

使用 Flink Doris Connector最适合的场景就是实时/批次同步源数据(Mysql,Oracle,PostgreSQL等)到Doris,使用Flink对Doris中的数据和其他数据源进行联合分析,也可以使用Flink Doris Connector。

其他

  1. Flink Doris Connector主要是依赖Checkpoint进行流式写入,所以Checkpoint的间隔即为数据的可见延迟时间。
  2. 为了保证Flink的Exactly Once语义,Flink Doris Connector 默认开启两阶段提交,Doris在1.1版本后默认开启两阶段提交。1.0可通过修改BE参数开启,可参考two_phase_commit

常见问题

  1. Doris Source在数据读取完成后,流为什么就结束了?

目前Doris Source是有界流,不支持CDC方式读取。

  1. Flink读取Doris可以进行条件下推吗?

通过配置doris.filter.query参数,详情参考配置小节。

  1. 如何写入Bitmap类型?
  1. CREATE TABLE bitmap_sink (
  2. dt int,
  3. page string,
  4. user_id int
  5. )
  6. WITH (
  7. 'connector' = 'doris',
  8. 'fenodes' = '127.0.0.1:8030',
  9. 'table.identifier' = 'test.bitmap_test',
  10. 'username' = 'root',
  11. 'password' = '',
  12. 'sink.label-prefix' = 'doris_label',
  13. 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
  14. )
  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint启动,否则会报如上错误。 不要求Exactly-Once时,也可通过关闭2PC提交(sink.enable-2pc=false) 或更换不同的sink.label-prefix解决。

  1. errCode = 2, detailMessage = transaction [19650] not found

发生在Commit阶段,checkpoint里面记录的事务ID,在FE侧已经过期,此时再次commit就会出现上述错误。 此时无法从checkpoint启动,后续可通过修改fe.conf的streaming_label_keep_max_second配置来延长过期时间,默认12小时。

  1. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 max_running_txn_num_per_db 来解决,具体可参考 max_running_txn_num_per_db

同时,一个任务频繁修改label重启,也可能会导致这个错误。2pc场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从checkpoint重启时,flink任务才会主动abort掉之前已经precommit成功,没有commit的txn,频繁修改label重启,会导致大量precommit成功的txn无法被abort,占用事务。在Unique模型下也可关闭2pc,可以实现幂等写入。

  1. Flink写入Uniq模型时,如何保证一批数据的有序性?

可以添加sequence列配置来保证,具体可参考 sequence

  1. Flink任务没报错,但是无法同步数据?

Connector1.1.0版本以前,是攒批写入的,写入均是由数据驱动,需要判断上游是否有数据写入。1.1.0之后,依赖Checkpoint,必须开启Checkpoint才能写入。

  1. tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

通常发生在Connector1.1.0之前,是由于写入频率过快,导致版本过多。可以通过设置sink.batch.size 和 sink.batch.interval参数来降低Streamload的频率。

  1. Flink导入有脏数据,如何跳过?

Flink在数据导入时,如果有脏数据,比如字段格式、长度等问题,会导致StreamLoad报错,此时Flink会不断的重试。如果需要跳过,可以通过禁用StreamLoad的严格模式(strict_mode=false,max_filter_ratio=1)或者在Sink算子之前对数据做过滤。

  1. 源表和Doris表应如何对应? 使用Flink Connector导入数据时,要注意两个方面,第一是源表的列和类型跟flink sql中的列和类型要对应上;第二个是flink sql中的列和类型要跟doris表的列和类型对应上,具体可以参考上面的”Doris 和 Flink 列类型映射关系”

  2. TApplicationException: get_next failed: out of sequence response: expected 4 but got 3

这是由于 Thrift 框架存在并发 bug 导致的,建议你使用尽可能新的 connector 以及与之兼容的 flink 版本。

  1. DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc

你可以在 TaskManager 中搜索日志 abort transaction response,根据 http 返回码确定是 client 的问题还是 server 的问题。

  1. 使用doris.filter.query出现org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered “xx” at line x, column xx

出现这个问题主要是条件varchar/string类型,需要加引号导致的,正确写法是 xxx = ‘’xxx’’,这样Flink SQL 解析器会将两个连续的单引号解释为一个单引号字符,而不是字符串的结束,并将拼接后的字符串作为属性的值。