Flink Doris Connector

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。

代码库地址:https://github.com/apache/incubator-doris-flink-connector

  • 可以将Doris表映射为DataStream或者Table

注意:

  1. 修改和删除只支持在Unique Key模型上
  2. 目前的删除是支持Flink CDC的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。Flink CDC的数据删除使用方式参照本文档最后一节

版本兼容

ConnectorFlinkDorisJavaScala
1.11.6-2.12-xx1.11.x0.13+82.12
1.12.7-2.12-xx1.12.x0.13.+82.12
1.13.5-2.12-xx1.13.x0.13.+82.12

编译与安装

在源码目录下执行:

  1. sh build.sh --flink 1.11.6 --scala 2.12 # flink 1.11.6 scala 2.12

注:如果你是从 tag 检出的源码,则可以直接执行 sh build.sh --tag,而无需指定 flink 和 scala 的版本。因为 tag 源码中的版本是固定的。比如 1.13.5-2.12-1.0.1 表示 flink 版本 1.13.5,scala 版本 2.12,connector 版本 1.0.1。

编译成功后,会在 output/ 目录下生成文件,如:doris-flink-1.13.5-2.12-1.0.1-SNAPSHOT.jar 。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 jars/ 文件夹下。 Yarn 集群模式运行的 Flink ,则将此文件放入预部署包中。

备注

  1. doris FE 要在配置中配置启用http v2
  2. Scala版本目前只支持2.12.x版本

conf/fe.conf

  1. enable_http_server_v2 = true

使用Maven 管理

添加 maven 依赖

  1. <dependency>
  2. <groupId>org.apache.doris</groupId>
  3. <artifactId>doris-flink-connector</artifactId>
  4. <version>1.11.6-2.12-SNAPSHOT</version>
  5. </dependency>

备注

1.11.6 可以根据flink 版本替换成替换成 1.12.7 或者 1.13.5

使用方法

Flink读写Doris数据主要有三种方式

  • SQL
  • DataStream
  • DataSet

参数配置

Flink Doris Connector Sink的内部实现是通过 Stream load 服务向Doris写入数据, 同时也支持 Stream load 请求参数的配置设定

参数配置方法

  • SQL 使用WITH参数sink.properties.配置
  • DataStream 使用方法DorisExecutionOptions.builder().setStreamLoadProp(Properties)配置

SQL

  • Source
  1. CREATE TABLE flink_doris_source (
  2. name STRING,
  3. age INT,
  4. price DECIMAL(5,2),
  5. sale DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'doris',
  9. 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
  10. 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
  11. 'username' = '$YOUR_DORIS_USERNAME',
  12. 'password' = '$YOUR_DORIS_PASSWORD'
  13. );
  • Sink
  1. CREATE TABLE flink_doris_sink (
  2. name STRING,
  3. age INT,
  4. price DECIMAL(5,2),
  5. sale DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'doris',
  9. 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
  10. 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
  11. 'username' = '$YOUR_DORIS_USERNAME',
  12. 'password' = '$YOUR_DORIS_PASSWORD'
  13. );
  • Insert
  1. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

  • Source
  1. Properties properties = new Properties();
  2. properties.put("fenodes","FE_IP:8030");
  3. properties.put("username","root");
  4. properties.put("password","");
  5. properties.put("table.identifier","db.table");
  6. env.addSource(new DorisSourceFunction(
  7. new DorisStreamOptions(properties),
  8. new SimpleListDeserializationSchema()
  9. )
  10. ).print();
  • Sink

Json数据流

  1. Properties pro = new Properties();
  2. pro.setProperty("format", "json");
  3. pro.setProperty("strip_outer_array", "true");
  4. env.fromElements(
  5. "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
  6. )
  7. .addSink(
  8. DorisSink.sink(
  9. DorisReadOptions.builder().build(),
  10. DorisExecutionOptions.builder()
  11. .setBatchSize(3)
  12. .setBatchIntervalMs(0l)
  13. .setMaxRetries(3)
  14. .setStreamLoadProp(pro).build(),
  15. DorisOptions.builder()
  16. .setFenodes("FE_IP:8030")
  17. .setTableIdentifier("db.table")
  18. .setUsername("root")
  19. .setPassword("").build()
  20. ));

Json数据流

  1. env.fromElements(
  2. "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
  3. )
  4. .addSink(
  5. DorisSink.sink(
  6. DorisOptions.builder()
  7. .setFenodes("FE_IP:8030")
  8. .setTableIdentifier("db.table")
  9. .setUsername("root")
  10. .setPassword("").build()
  11. ));

RowData数据流

  1. DataStream<RowData> source = env.fromElements("")
  2. .map(new MapFunction<String, RowData>() {
  3. @Override
  4. public RowData map(String value) throws Exception {
  5. GenericRowData genericRowData = new GenericRowData(3);
  6. genericRowData.setField(0, StringData.fromString("北京"));
  7. genericRowData.setField(1, 116.405419);
  8. genericRowData.setField(2, 39.916927);
  9. return genericRowData;
  10. }
  11. });
  12. String[] fields = {"city", "longitude", "latitude"};
  13. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
  14. source.addSink(
  15. DorisSink.sink(
  16. fields,
  17. types,
  18. DorisReadOptions.builder().build(),
  19. DorisExecutionOptions.builder()
  20. .setBatchSize(3)
  21. .setBatchIntervalMs(0L)
  22. .setMaxRetries(3)
  23. .build(),
  24. DorisOptions.builder()
  25. .setFenodes("FE_IP:8030")
  26. .setTableIdentifier("db.table")
  27. .setUsername("root")
  28. .setPassword("").build()
  29. ));

DataSet

  • Sink
  1. MapOperator<String, RowData> data = env.fromElements("")
  2. .map(new MapFunction<String, RowData>() {
  3. @Override
  4. public RowData map(String value) throws Exception {
  5. GenericRowData genericRowData = new GenericRowData(3);
  6. genericRowData.setField(0, StringData.fromString("北京"));
  7. genericRowData.setField(1, 116.405419);
  8. genericRowData.setField(2, 39.916927);
  9. return genericRowData;
  10. }
  11. });
  12. DorisOptions dorisOptions = DorisOptions.builder()
  13. .setFenodes("FE_IP:8030")
  14. .setTableIdentifier("db.table")
  15. .setUsername("root")
  16. .setPassword("").build();
  17. DorisReadOptions readOptions = DorisReadOptions.defaults();
  18. DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
  19. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
  20. String[] fields = {"city", "longitude", "latitude"};
  21. DorisDynamicOutputFormat outputFormat = new DorisDynamicOutputFormat(
  22. dorisOptions, readOptions, executionOptions, types, fields
  23. );
  24. outputFormat.open(0, 1);
  25. data.output(outputFormat);
  26. outputFormat.close();

配置

通用配置项

KeyDefault ValueComment
fenodesDoris FE http 地址
table.identifierDoris 表名,如:db1.tbl1
username访问Doris的用户名
password访问Doris的密码
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger. MAX_VALUE一个Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Flink侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少flink与Doris之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到flink-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.read.field读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。
sink.batch.size10000单次写BE的最大行数
sink.max-retries1写BE失败之后的重试次数
sink.batch.interval10sflush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为10秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
sink.properties.*Stream load 的导入参数

例如:
‘sink.properties.column_separator’ = ‘, ‘
定义列分隔符

‘sink.properties.escape_delimiters’ = ‘true’
特殊字符作为分隔符,’\x01’会被转换为二进制的0x01

‘sink.properties.format’ = ‘json’
‘sink.properties.strip_outer_array’ = ‘true’
JSON格式导入
sink.enable-deletetrue是否启用删除。此选项需要Doris表开启批量删除功能(0.15+版本默认开启),只支持Uniq模型。
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype
  1. CREATE TABLE cdc_mysql_source (
  2. id int
  3. ,name VARCHAR
  4. ,PRIMARY KEY (id) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = '127.0.0.1',
  8. 'port' = '3306',
  9. 'username' = 'root',
  10. 'password' = 'password',
  11. 'database-name' = 'database',
  12. 'table-name' = 'table'
  13. );
  14. -- 支持删除事件同步(sink.enable-delete='true'),需要Doris表开启批量删除功能
  15. CREATE TABLE doris_sink (
  16. id INT,
  17. name STRING
  18. )
  19. WITH (
  20. 'connector' = 'doris',
  21. 'fenodes' = '127.0.0.1:8030',
  22. 'table.identifier' = 'database.table',
  23. 'username' = 'root',
  24. 'password' = '',
  25. 'sink.properties.format' = 'json',
  26. 'sink.properties.strip_outer_array' = 'true',
  27. 'sink.enable-delete' = 'true'
  28. );
  29. insert into doris_sink select id,name from cdc_mysql_source;