Doris

Overview

  • Doris Load node supports writing data to the Doris database.
  • Two modes are supported for sink to Doris: Single-sink for specify fixed database name and table name to sink. Multi-sink for custom database name and table name according to src format, which suitable for scenarios such as multi-table writing or whole database synchronization.
  • This document describes how to set up a Doris Load node to sink to Doris.

Supported Version

Load NodeDoris version
Doris0.13+

Dependencies

In order to set up the Doris Load node, the dependency information needed to use a build automation tool such as Maven or SBT is provided below.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-doris</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

Prepare

Create MySql Extract table

  • For Single-sink: Create a table cdc.cdc_mysql_source in the MySQL database. The command is as follows:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use cdc;
  3. Database changed
  4. mysql> CREATE TABLE `cdc_mysql_source` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL,
  7. `dr` tinyint(3) DEFAULT 0,
  8. PRIMARY KEY (`id`)
  9. );
  10. Query OK, 0 rows affected (0.02 sec)
  11. mysql> insert into cdc_mysql_source values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
  12. Query OK, 3 rows affected (0.01 sec)
  13. Records: 3 Duplicates: 0 Warnings: 0
  14. mysql> select * from cdc_mysql_source;
  15. +----+----------+----+
  16. | id | name | dr |
  17. +----+----------+----+
  18. | 1 | zhangsan | 0 |
  19. | 2 | lisi | 0 |
  20. | 3 | wangwu | 0 |
  21. +----+----------+----+
  22. 3 rows in set (0.07 sec)
  • For Multi-sink: Create tables user_db.user_id_nameuser_db.user_id_score in the MySQL database. The command is as follows:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
  2. mysql> use user_db;
  3. Database changed
  4. mysql> CREATE TABLE `user_id_name` (
  5. `id` int(11) NOT NULL AUTO_INCREMENT,
  6. `name` varchar(64) DEFAULT NULL
  7. PRIMARY KEY (`id`)
  8. );
  9. Query OK, 0 rows affected (0.02 sec)
  10. mysql> CREATE TABLE `user_id_score` (
  11. `id` int(11) NOT NULL AUTO_INCREMENT,
  12. `score` double default 0,
  13. PRIMARY KEY (`id`)
  14. );
  15. Query OK, 0 rows affected (0.02 sec)
  16. mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
  17. Query OK, 3 rows affected (0.01 sec)
  18. Records: 3 Duplicates: 0 Warnings: 0
  19. mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
  20. Query OK, 3 rows affected (0.01 sec)
  21. Records: 3 Duplicates: 0 Warnings: 0
  22. mysql> select * from user_id_name;
  23. +------+--------+
  24. | id | name |
  25. +------+--------+
  26. | 1001 | lily |
  27. | 1002 | tom |
  28. | 1003 | alan |
  29. +----+----------+
  30. 3 rows in set (0.07 sec)
  31. mysql> select * from user_id_score;
  32. +------+------+
  33. | id | name |
  34. +------+------+
  35. | 1001 | 99 |
  36. | 1002 | 96 |
  37. | 1003 | 98 |
  38. +----+--------+
  39. 3 rows in set (0.07 sec)

Create Doris Load table

  • For Single-sink: Create a table cdc.cdc_doris_sink in the Doris database. The command is as follows:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
  2. mysql> use cdc;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `cdc_doris_sink` (
  7. `id` int(11) NOT NULL COMMENT "user id",
  8. `name` varchar(50) NOT NULL COMMENT "user name",
  9. `dr` tinyint(4) NULL COMMENT "delete tag"
  10. ) ENGINE=OLAP
  11. UNIQUE KEY(`id`)
  12. COMMENT "OLAP"
  13. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  14. PROPERTIES (
  15. "replication_allocation" = "tag.location.default: 1"
  16. );
  17. Query OK, 0 rows affected (0.06 sec)
  • For Multi-sink: Create tables user_db.doris_user_id_nameuser_db.doris_user_id_score in the Doris database. The command is as follows:
  1. [root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
  2. mysql> use user_db;
  3. Reading table information for completion of table and column names
  4. You can turn off this feature to get a quicker startup with -A
  5. Database changed
  6. mysql> CREATE TABLE `doris_user_id_name` (
  7. `id` int(11) NOT NULL COMMENT "用户id",
  8. `name` varchar(50) NOT NULL COMMENT "昵称"
  9. ) ENGINE=OLAP
  10. UNIQUE KEY(`id`)
  11. COMMENT "OLAP"
  12. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  13. PROPERTIES (
  14. "replication_allocation" = "tag.location.default: 1"
  15. );
  16. Query OK, 0 rows affected (0.06 sec)
  17. mysql> CREATE TABLE `doris_user_id_score` (
  18. `id` int(11) NOT NULL COMMENT "用户id",
  19. `score` double default 0
  20. ) ENGINE=OLAP
  21. UNIQUE KEY(`id`)
  22. COMMENT "OLAP"
  23. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  24. PROPERTIES (
  25. "replication_allocation" = "tag.location.default: 1"
  26. );
  27. Query OK, 0 rows affected (0.06 sec)

How to create a Doris Load Node

Usage for SQL API

  • For Single-sink: Doris load
  1. [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'cdc',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_doris_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'doris-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'table.identifier' = 'cdc.cdc_doris_sink',
  29. > 'username' = 'root',
  30. > 'password' = '000000',
  31. > 'sink.properties.format' = 'json',
  32. > 'sink.properties.strip_outer_array' = 'true',
  33. > 'sink.enable-delete' = 'true'
  34. > );
  35. [INFO] Execute statement succeed.
  36. -- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
  37. Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  38. [INFO] Submitting SQL update statement to the cluster...
  39. [INFO] SQL update statement has been successfully submitted to the cluster:
  40. Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
  • For Single-sink: Doris load
  1. ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. [INFO] Session property has been set.
  4. Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
  5. [INFO] Session property has been set.
  6. Flink SQL> CREATE TABLE cdc_mysql_source (
  7. > id int
  8. > ,name VARCHAR
  9. > ,dr TINYINT
  10. > ,PRIMARY KEY (id) NOT ENFORCED
  11. > ) WITH (
  12. > 'connector' = 'mysql-cdc-inlong',
  13. > 'hostname' = 'localhost',
  14. > 'port' = '3306',
  15. > 'username' = 'root',
  16. > 'password' = '123456',
  17. > 'database-name' = 'test',
  18. > 'table-name' = 'cdc_mysql_source'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE cdc_doris_sink (
  22. > id INT,
  23. > name STRING,
  24. > dr TINYINT
  25. > ) WITH (
  26. > 'connector' = 'doris-inlong',
  27. > 'fenodes' = 'localhost:8030',
  28. > 'username' = 'root',
  29. > 'password' = '000000',
  30. > 'sink.enable-delete' = 'true',
  31. > 'sink.multiple.enable' = 'true',
  32. > 'sink.multiple.format' = 'canal-json',
  33. > 'sink.multiple.database-pattern' = '${database}',
  34. > 'sink.multiple.table-pattern' = 'doris_${table}'
  35. > );
  36. [INFO] Execute statement succeed.
  37. -- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
  38. Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
  39. [INFO] Submitting SQL update statement to the cluster...
  40. [INFO] SQL update statement has been successfully submitted to the cluster:
  41. Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

Doris Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)stringSpecify which connector to use, valid values are: doris
fenodesrequired(none)stringDoris FE http address, support multiple addresses, separated by commas
table.identifierrequired(none)stringDoris table identifier, eg, db1.tbl1
usernamerequired(none)stringDoris username
passwordrequired(none)stringDoris password
doris.request.retriesoptional3intNumber of retries to send requests to Doris
doris.request.connect.timeout.msoptional30000intConnection timeout for sending requests to Doris
doris.request.read.timeout.msoptional30000intRead timeout for sending request to Doris
doris.request.query.timeout.soptional3600intQuery the timeout time of Doris, the default is 1 hour, -1 means no timeout limit
doris.request.tablet.sizeoptionalInteger.MAX_VALUEintThe number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.
doris.batch.sizeoptional1024intThe maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris. Thereby reducing the extra time overhead caused by network delay.
doris.exec.mem.limitoptional2147483648longMemory limit for a single query. The default is 2GB, in bytes.
doris.deserialize.arrow.asyncoptionalfalsebooleanWhether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration
doris.deserialize.queue.sizeoptional64intAsynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true
doris.read.fieldoptional(none)stringList of column names in the Doris table, separated by commas
doris.filter.queryoptional(none)stringFilter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.
sink.batch.sizeoptional10000intMaximum number of lines in a single write BE
sink.max-retriesoptional1intNumber of retries after writing BE failed
sink.batch.intervaloptional10sstringThe flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 10 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing.
sink.properties.optional(none)stringThe stream load parameters.

eg:
sink.properties.column_separator’ = ‘,’

Setting ‘sink.properties.escape_delimiters’ = ‘true’ if you want to use a control char as a separator, so that such as ‘\x01’ will translate to binary 0x01

Support JSON format import, you need to enable both ‘sink.properties.format’ =’json’ and ‘sink.properties.strip_outer_array’ =’true’

Support CSV format data writing if ‘sink.properties.format’=’csv’ is setted.
sink.enable-deleteoptionaltruebooleanWhether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model.
sink.multiple.enableoptionalfalsebooleanDetermine whether to support multiple sink writing, default is false. when sink.multiple.enable is true, need sink.multiple.formatsink.multiple.database-patternsink.multiple.table-pattern be correctly set.
sink.multiple.formatoptional(none)stringThe format of multiple sink, it represents the real format of the raw binary data. can be canal-json or debezium-json at present. See kafka — Dynamic Topic Extraction for more details.
sink.multiple.database-patternoptional(none)stringExtract database name from the raw binary data, this is only used in the multiple sink writing scenario.
sink.multiple.table-patternoptional(none)stringExtract table name from the raw binary data, this is only used in the multiple sink writing scenario.
sink.multiple.ignore-single-table-errorsoptionaltruebooleanWhether ignore the single table erros when multiple sink writing scenario. When it is true,sink continue when one table occur exception, only stop the exception table sink. When it is false, stop the whole sink when one table occur exception.
inlong.metric.labelsoptional(none)stringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.
sink.multiple.schema-update.policyoptional(none)stringIf sink data fields do not match doris table, such as table does not exsit or string data is over length, doris server will throw an exception.

When this option is THROW_WITH_STOP, the exception will be thrown up to flink framework, flink will restart task automatically, trying to resume the task.

When this option is STOP_PARTIAL, doris connector will stop writing into this table, other tables are written normally. The exception will be logging but not thrown up.

When this option is LOG_WITH_IGNORE, doris connector only log the error, not throw up. Doris connector will try to write to doris server again when receiving new source data.
dirty.ignoreoptinal(none)booleanWhen writing data into doris table, errors may be thrown by doris server as table does not exist or data is over length.

When this option is true, and dirty.side-output. properties are configed correctly, dirty data can be written to Amazon S3 or Tencent Colud COS storage. Dirty data metrics will also be collected automatically.

When this option is false, only dirty data metrics will be collected, but dirty data will not be archived.
dirty.side-output.enableoptinal(none)booleanWhen this option is ture and other options about S3 or COS is configed correctly, dirty data archiving will works. When false, dirty data archiving will not work.
dirty.side-output.connectoroptinal(none)strings3 or log are supported now.

When log, doris connector only log the dirty data, not archive data.

When s3, doris connector can write dirty data to S3 or COS.
dirty.side-output.s3.bucketoptinal(none)stringThe bucket name of S3 or COS
dirty.side-output.s3.endpointoptinal(none)stringThe endpoint of S3 or COS
dirty.side-output.s3.keyoptinal(none)stringThe key of S3 or COS
dirty.side-output.s3.regionoptinal(none)stringThe region of S3 or COS
dirty.side-output.line-delimiteroptinal(none)stringThe line delimiter of dirty data
dirty.side-output.field-delimiteroptinal(none)stringThe field delimiter of dirty data
dirty.side-output.s3.secret-key-idoptinal(none)stringThe secret key of S3 or COS
dirty.side-output.s3.access-key-idoptinal(none)stringThe access key of S3 or COS
dirty.side-output.formatoptinal(none)stringThe format of dirty data archiving, supports json or csv
dirty.side-output.log-tagoptinal(none)stringThe log tag of dirty data. Doris connector uses lags to distinguish which doris database and table the dirty data will be written to.
dirty.identifieroptinal(none)stringThe file name of drity data which written to S3 or COS.
dirty.side-output.labelsoptinal(none)stringEvery dirty data line contains label and business data fields. Label is in front, and business data is at end.

Data Type Mapping

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

See flink-doris-connector for more details.