Flink Doris Connector

本文档适用于flink-doris-connector 1.1.0之后的版本,1.1.0之前的版本参考这里

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。

代码库地址:https://github.com/apache/doris-flink-connector

  • 可以将 Doris 表映射为 DataStream 或者 Table

注意:

  1. 修改和删除只支持在 Unique Key 模型上
  2. 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。Flink CDC 的数据删除使用方式参照本文档最后一节

版本兼容

ConnectorFlinkDorisJavaScala
1.14_2.11-1.1.01.14.x1.0+82.11
1.14_2.12-1.1.01.14.x1.0+82.12

编译与安装

准备工作

1.修改custom_env.sh.tpl文件,重命名为custom_env.sh

2.指定thrift安装目录

  1. ##源文件内容
  2. #export THRIFT_BIN=
  3. #export MVN_BIN=
  4. #export JAVA_HOME=
  5. ##修改如下,MacOS为例
  6. export THRIFT_BIN=/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift
  7. #export MVN_BIN=
  8. #export JAVA_HOME=
  9. 安装 `thrift` 0.13.0 版本(注意:`Doris` 0.15 和最新的版本基于 `thrift` 0.13.0 构建, 之前的版本依然使用`thrift` 0.9.3 构建)
  10. Windows:
  11. 1.下载:`http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe`(下载目录自己指定)
  12. 2.修改thrift-0.13.0.exe thrift
  13. MacOS:
  14. 1. 下载:`brew install thrift@0.13.0`
  15. 2. 默认下载地址:/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift
  16. 注:MacOS执行 `brew install thrift@0.13.0` 可能会报找不到版本的错误,解决方法如下,在终端执行:
  17. 1. `brew tap-new $USER/local-tap`
  18. 2. `brew extract --version='0.13.0' thrift $USER/local-tap`
  19. 3. `brew install thrift@0.13.0`
  20. 参考链接: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c`
  21. Linux:
  22. 1.下载源码包:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz`
  23. 2.安装依赖:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++`
  24. 3.`tar zxvf thrift-0.13.0.tar.gz`
  25. 4.`cd thrift-0.13.0`
  26. 5.`./configure --without-tests`
  27. 6.`make`
  28. 7.`make install`
  29. 安装完成后查看版本:thrift --version
  30. 注:如果编译过Doris,则不需要安装thrift,可以直接使用 $DORIS_HOME/thirdparty/installed/bin/thrift

在源码目录下执行:

  1. sh build.sh
  2. Usage:
  3. build.sh --flink version --scala version # specify flink and scala version
  4. build.sh --tag # this is a build from tag
  5. e.g.:
  6. build.sh --flink 1.14.3 --scala 2.12
  7. build.sh --tag
  8. 然后按照你需要版本执行命令编译即可,例如:
  9. sh build.sh --flink 1.14.3 --scala 2.12

编译成功后,会在 target/ 目录下生成文件,如:flink-doris-connector-1.14_2.12-1.1.0-SNAPSHOT.jar 。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。 Yarn 集群模式运行的 Flink ,则将此文件放入预部署包中。

备注

  1. Doris FE 要在配置中配置启用 http v2

​ conf/fe.conf

  1. enable_http_server_v2 = true

使用 Maven 管理

添加 flink-doris-connector 和必要的 Flink Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_${scala.version}</artifactId>
  10. <version>${flink.version}</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-clients_${scala.version}</artifactId>
  16. <version>${flink.version}</version>
  17. <scope>provided</scope>
  18. </dependency>
  19. <!-- flink table -->
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-table-planner_${scala.version}</artifactId>
  23. <version>${flink.version}</version>
  24. <scope>provided</scope>
  25. </dependency>
  26. <!-- flink-doris-connector -->
  27. <dependency>
  28. <groupId>org.apache.doris</groupId>
  29. <artifactId>flink-doris-connector-1.14_2.12</artifactId>
  30. <version>1.1.0</version>
  31. </dependency>

备注

1.请根据不同的 Flink 和 Scala 版本替换对应的 Connector 和 Flink 依赖版本。

使用方法

Flink 读写 Doris 数据主要有两种方式

  • SQL
  • DataStream

参数配置

Flink Doris Connector Sink 的内部实现是通过 Stream Load 服务向 Doris 写入数据, 同时也支持 Stream Load 请求参数的配置设置,具体参数可参考这里,配置方法如下:

  • SQL 使用 WITH 参数 sink.properties. 配置
  • DataStream 使用方法DorisExecutionOptions.builder().setStreamLoadProp(Properties)配置

SQL

  • Source
  1. CREATE TABLE flink_doris_source (
  2. name STRING,
  3. age INT,
  4. price DECIMAL(5,2),
  5. sale DOUBLE
  6. )
  7. WITH (
  8. 'connector' = 'doris',
  9. 'fenodes' = 'FE_IP:8030',
  10. 'table.identifier' = 'database.table',
  11. 'username' = 'root',
  12. 'password' = 'password'
  13. );
  • Sink
  1. -- enable checkpoint
  2. SET 'execution.checkpointing.interval' = '10s';
  3. CREATE TABLE flink_doris_sink (
  4. name STRING,
  5. age INT,
  6. price DECIMAL(5,2),
  7. sale DOUBLE
  8. )
  9. WITH (
  10. 'connector' = 'doris',
  11. 'fenodes' = 'FE_IP:8030',
  12. 'table.identifier' = 'db.table',
  13. 'username' = 'root',
  14. 'password' = 'password',
  15. 'sink.label-prefix' = 'doris_label'
  16. );
  • Insert
  1. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

  • Source
  1. DorisOptions.Builder builder = DorisOptions.builder()
  2. .setFenodes("FE_IP:8030")
  3. .setTableIdentifier("db.table")
  4. .setUsername("root")
  5. .setPassword("password");
  6. DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
  7. .setDorisOptions(builder.build())
  8. .setDorisReadOptions(DorisReadOptions.builder().build())
  9. .setDeserializer(new SimpleListDeserializationSchema())
  10. .build();
  11. env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
  • Sink

String 数据流

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. DorisSink.Builder<String> builder = DorisSink.builder();
  4. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  5. dorisBuilder.setFenodes("FE_IP:8030")
  6. .setTableIdentifier("db.table")
  7. .setUsername("root")
  8. .setPassword("password");
  9. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  10. executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix
  11. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  12. .setDorisExecutionOptions(executionBuilder.build())
  13. .setSerializer(new SimpleStringSerializer()) //serialize according to string
  14. .setDorisOptions(dorisBuilder.build());
  15. //mock string source
  16. List<Tuple2<String, Integer>> data = new ArrayList<>();
  17. data.add(new Tuple2<>("doris",1));
  18. DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
  19. source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
  20. .sinkTo(builder.build());

RowData 数据流

  1. // enable checkpoint
  2. env.enableCheckpointing(10000);
  3. //doris sink option
  4. DorisSink.Builder<RowData> builder = DorisSink.builder();
  5. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
  6. dorisBuilder.setFenodes("FE_IP:8030")
  7. .setTableIdentifier("db.table")
  8. .setUsername("root")
  9. .setPassword("password");
  10. // json format to streamload
  11. Properties properties = new Properties();
  12. properties.setProperty("format", "json");
  13. properties.setProperty("read_json_by_line", "true");
  14. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
  15. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
  16. .setStreamLoadProp(properties); //streamload params
  17. //flink rowdata‘s schema
  18. String[] fields = {"city", "longitude", "latitude"};
  19. DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE()};
  20. builder.setDorisReadOptions(DorisReadOptions.builder().build())
  21. .setDorisExecutionOptions(executionBuilder.build())
  22. .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
  23. .setFieldNames(fields)
  24. .setType("json") //json format
  25. .setFieldType(types).build())
  26. .setDorisOptions(dorisBuilder.build());
  27. //mock rowdata source
  28. DataStream<RowData> source = env.fromElements("")
  29. .map(new MapFunction<String, RowData>() {
  30. @Override
  31. public RowData map(String value) throws Exception {
  32. GenericRowData genericRowData = new GenericRowData(3);
  33. genericRowData.setField(0, StringData.fromString("beijing"));
  34. genericRowData.setField(1, 116.405419);
  35. genericRowData.setField(2, 39.916927);
  36. return genericRowData;
  37. }
  38. });
  39. source.sinkTo(builder.build());

配置

通用配置项

KeyDefault ValueRequiredComment
fenodesYDoris FE http 地址
table.identifierYDoris 表名,如:db.tbl
usernameY访问 Doris 的用户名
passwordY访问 Doris 的密码
doris.request.retries3N向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms30000N向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms30000N向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s3600N查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger. MAX_VALUEN一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size1024N一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648N单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncFALSEN是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size64N异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.fieldN读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.queryN过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。
sink.label-prefixYStream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。
sink.properties.*NStream Load 的导入参数。
例如: ‘sink.properties.column_separator’ = ‘, ‘ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符,’\x01’会被转换为二进制的0x01

JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.read_json_by_line’ = ‘true’
sink.enable-deleteTRUEN是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。
sink.enable-2pcTRUEN是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype
  1. CREATE TABLE cdc_mysql_source (
  2. id int
  3. ,name VARCHAR
  4. ,PRIMARY KEY (id) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = '127.0.0.1',
  8. 'port' = '3306',
  9. 'username' = 'root',
  10. 'password' = 'password',
  11. 'database-name' = 'database',
  12. 'table-name' = 'table'
  13. );
  14. -- 支持删除事件同步(sink.enable-delete='true'),需要 Doris 表开启批量删除功能
  15. CREATE TABLE doris_sink (
  16. id INT,
  17. name STRING
  18. )
  19. WITH (
  20. 'connector' = 'doris',
  21. 'fenodes' = '127.0.0.1:8030',
  22. 'table.identifier' = 'database.table',
  23. 'username' = 'root',
  24. 'password' = '',
  25. 'sink.properties.format' = 'json',
  26. 'sink.properties.read_json_by_line' = 'true',
  27. 'sink.enable-delete' = 'true',
  28. 'sink.label-prefix' = 'doris_label'
  29. );
  30. insert into doris_sink select id,name from cdc_mysql_source;

Java示例

samples/doris-demo 下提供了 Java 版本的示例,可供参考,查看点击这里

最佳实践

应用场景

使用 Flink Doris Connector最适合的场景就是实时/批次同步源数据(Mysql,Oracle,PostgreSQL等)到Doris,使用Flink对Doris中的数据和其他数据源进行联合分析,也可以使用Flink Doris Connector。

其他

  1. Flink Doris Connector主要是依赖Checkpoint进行流式写入,所以Checkpoint的间隔即为数据的可见延迟时间。
  2. 为了保证Flink的Exactly Once语义,Flink Doris Connector 默认开启两阶段提交,Doris在1.1版本后默认开启两阶段提交。1.0可通过修改BE参数开启,可参考two_phase_commit

常见问题

  1. Bitmap类型写入
  1. CREATE TABLE bitmap_sink (
  2. dt int,
  3. page string,
  4. user_id int
  5. )
  6. WITH (
  7. 'connector' = 'doris',
  8. 'fenodes' = '127.0.0.1:8030',
  9. 'table.identifier' = 'test.bitmap_test',
  10. 'username' = 'root',
  11. 'password' = '',
  12. 'sink.label-prefix' = 'doris_label',
  13. 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
  14. )
  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint启动,否则会报如上错误。 不要求Exactly-Once时,也可通过关闭2PC提交(sink.enable-2pc=false) 或更换不同的sink.label-prefix解决。