Kafka Connect is a scalable and reliable tool for data transmission between Apache Kafka and other systems. Connectors can be defined Move large amounts of data in and out of Kafka.

The Doris community provides the doris-kafka-connector plug-in, which can write data in the Kafka topic to Doris.

Usage Doris Kafka Connector



maven dependencies

  1. <dependency>
  2. <groupId>org.apache.doris</groupId>
  3. <artifactId>doris-kafka-connector</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>

Standalone mode startup

Create the plugins directory under $KAFKA_HOME and put the downloaded doris-kafka-connector jar package into it

Configure config/connect-standalone.properties

  1. # Modify broker address
  2. bootstrap.servers=
  3. # Modify to the created plugins directory
  4. # Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
  5. plugin.path=$KAFKA_HOME/plugins

Configure doris-connector-sink.properties

Create doris-connector-sink.properties in the config directory and configure the following content:

  1. name=test-doris-sink
  2. connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
  3. topics=topic_test
  4. doris.topic2table.map=topic_test:test_kafka_tbl
  5. buffer.count.records=10000
  6. buffer.flush.time=120
  7. buffer.size.bytes=5000000
  8. doris.urls=
  9. doris.http.port=8030
  10. doris.query.port=9030
  11. doris.user=root
  12. doris.password=
  13. doris.database=test_db
  14. key.converter=org.apache.kafka.connect.storage.StringConverter
  15. value.converter=org.apache.kafka.connect.json.JsonConverter

Start Standalone

  1. $KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties

note

Note: It is generally not recommended to use standalone mode in a production environment.

Distributed mode startup

Create the plugins directory under $KAFKA_HOME and put the downloaded doris-kafka-connector jar package into it

Configure config/connect-distributed.properties

  1. # Modify broker address
  2. bootstrap.servers=
  3. # Modify group.id, the same cluster needs to be consistent
  4. group.id=connect-cluster
  5. # Modify to the created plugins directory
  6. # Note: Please fill in the direct path to Kafka here. For example: plugin.path=/opt/kafka/plugins
  7. plugin.path=$KAFKA_HOME/plugins

Start Distributed

  1. $KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

Add Connector

  1. curl -i -H "Content-Type: application/json" -X POST -d '{
  2. "name":"test-doris-sink-cluster",
  3. "config":{
  4. "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
  5. "topics":"topic_test",
  6. "doris.topic2table.map": "topic_test:test_kafka_tbl",
  7. "buffer.count.records":"10000",
  8. "buffer.flush.time":"120",
  9. "buffer.size.bytes":"5000000",
  10. "doris.urls":"",
  11. "doris.user":"root",
  12. "doris.password":"",
  13. "doris.http.port":"8030",
  14. "doris.query.port":"9030",
  15. "doris.database":"test_db",
  16. "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  17. "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  18. }
  19. }'

Operation Connector

  1. # View connector status
  2. curl -i -X GET
  3. # Delete connector
  4. curl -i -X DELETE
  5. # Pause connector
  6. curl -i -X PUT
  7. # Restart connector
  8. curl -i -X PUT
  9. # Restart tasks within the connector
  10. curl -i -X POST

Refer to: Connect REST Interface

note

Note that when kafka-connect is started for the first time, three topics config.storage.topic offset.storage.topic and status.storage.topic will be created in the kafka cluster to record the shared connector configuration of kafka-connect. Offset data and status updates. How to Use Kafka Connect - Get Started

Access an SSL-certified Kafka cluster

Accessing an SSL-certified Kafka cluster through kafka-connect requires the user to provide a certificate file (client.truststore.jks) used to authenticate the Kafka Broker public key. You can add the following configuration in the connect-distributed.properties file:

  1. # Connect worker
  2. security.protocol=SSL
  3. ssl.truststore.location=/var/ssl/private/client.truststore.jks
  4. ssl.truststore.password=test1234
  5. # Embedded consumer for sink connectors
  6. consumer.security.protocol=SSL
  7. consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
  8. consumer.ssl.truststore.password=test1234

For instructions on configuring a Kafka cluster connected to SSL authentication through kafka-connect, please refer to: Configure Kafka Connect

Dead letter queue

By default, any errors encountered during or during the conversion will cause the connector to fail. Each connector configuration can also tolerate such errors by skipping them, optionally writing the details of each error and failed operation as well as the records in question (with varying levels of detail) to a dead-letter queue for logging.

  1. errors.tolerance=all
  2. errors.deadletterqueue.topic.name=test_error_topic
  3. errors.deadletterqueue.context.headers.enable=true
  4. errors.deadletterqueue.topic.replication.factor=1

Configuration items

KeyEnumDefault ValueRequiredDescription
name--YConnect application name, must be unique within the Kafka Connect environment
topics--YList of subscribed topics, separated by commas. like: topic1, topic2
doris.urls--YDoris FE connection address. If there are multiple, separate them with commas. like:,,
doris.http.port--YDoris HTTP protocol port
doris.query.port--YDoris MySQL protocol port
doris.user--YDoris username
doris.password--YDoris password
doris.database--YThe database to write to. It can be empty when there are multiple libraries. At the same time, the specific library name needs to be configured in topic2table.map.
doris.topic2table.map--NThe corresponding relationship between topic and table table, for example: topic1:tb1,topic2:tb2
The default is empty, indicating that topic and table names correspond one to one.
The format of multiple libraries is topic1:db1.tbl1,topic2:db2.tbl2
buffer.count.records-10000NThe number of records each Kafka partition buffers in memory before flushing to doris. Default 10000 records
buffer.flush.time-120NBuffer refresh interval, in seconds, default 120 seconds
buffer.size.bytes-5000000(5MB)NThe cumulative size of records buffered in memory for each Kafka partition, in bytes, default 5MB
jmx-trueNTo obtain connector internal monitoring indicators through JMX, please refer to: Doris-Connector-JMX
enable.2pc-trueNWhether to enable two-phase commit (TwoPhaseCommit) of Stream Load, the default is true.
enable.delete-falseNWhether to delete records synchronously, default false
label.prefix-${name}NStream load label prefix when importing data. Defaults to the Connector application name.
auto.redirect-trueNWhether to redirect StreamLoad requests. After being turned on, StreamLoad will redirect to the BE where data needs to be written through FE, and the BE information will no longer be displayed.
stream_loadNHow to import data. Supports stream_load to directly import data into Doris; also supports copy_into to import data into object storage, and then load the data into Doris.
NImport parameters for Stream Load.
For example: define column separator ‘sink.properties.column_separator’:’,’
Detailed parameter reference here

Enable Group Commit, for example, enable group commit in sync_mode mode: “sink.properties.group_commit”:”sync_mode”. Group Commit can be configured with three modes: off_mode, sync_mode, and async_mode. For specific usage, please refer to: Group-Commit

Enable partial column update, for example, enable update of partial columns of specified col2: “sink.properties.partial_columns”:”true”, “sink.properties.columns”: “ col2”,
at_least_onceNHow to ensure data consistency when consuming Kafka data is imported into Doris. Supports at_least_once exactly_once, default is at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data exactly_once
normalNType conversion mode of upstream data when using Connector to consume Kafka data.
normal means consuming data in Kafka normally without any type conversion.
debezium_ingestion means that when Kafka upstream data is collected through CDC (Changelog Data Capture) tools such as Debezium, the upstream data needs to undergo special type conversion to support it.
noneNUse Debezium to collect upstream database systems (such as MySQL), and when structural changes occur, the added fields can be synchronized to Doris.
none means that when the structure of the upstream database system changes, the changed structure will not be synchronized to Doris.
basic means synchronizing the data change operation of the upstream database. Since changing the column structure is a dangerous operation (it may lead to accidentally deleting columns of the Doris table structure), currently only the operation of adding columns synchronously upstream is supported. When a column is renamed, the old column remains unchanged, and the Connector will add a new column in the target table and sink the renamed new data into the new column.
database.time_zone-UTCNWhen converter.mode is not normal mode, it provides a way to specify time zone conversion for date data types (such as datetime, date, timestamp, etc.). The default is UTC time zone.
avro.topic2schema.filepath--NBy reading the locally provided Avro Schema file, the Avro file content in the Topic is parsed to achieve decoupling from the Schema registration center provided by Confluent.
This configuration needs to be used with the key.converter or value.converter prefix. For example, the local Avro Schema file for configuring avro-user and avro-product Topic is as follows: “value.converter.avro.topic2schema. filepath”:”avro-user:file:///opt/avro_user.avsc, avro-product:file:///opt/avro_product.avsc”
For specific usage, please refer to: #32

For other Kafka Connect Sink common configuration items, please refer to: connect_configuring

Type mapping

Doris-kafka-connector uses logical or primitive type mapping to resolve the column’s data type.

Primitive types refer to simple data types represented using Kafka connect’s `Schema`. Logical data types usually use the `Struct` structure to represent complex types, or date and time types.

Kafka Primitive TypeDoris Type
Kafka Logical TypeDoris Type
Debezium Logical TypeDoris Type

Best Practices

Load Json serialized data

  1. curl -i -H "Content-Type: application/json" -X POST -d '{
  2. "name":"doris-json-test",
  3. "config":{
  4. "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
  5. "topics":"json_topic",
  6. "tasks.max":"10",
  7. "doris.topic2table.map": "json_topic:json_tab",
  8. "buffer.count.records":"100000",
  9. "buffer.flush.time":"120",
  10. "buffer.size.bytes":"10000000",
  11. "doris.urls":"",
  12. "doris.user":"root",
  13. "doris.password":"",
  14. "doris.http.port":"8030",
  15. "doris.query.port":"9030",
  16. "doris.database":"test",
  17. "load.model":"stream_load",
  18. "key.converter":"org.apache.kafka.connect.json.JsonConverter",
  19. "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  20. }
  21. }'

Load Avro serialized data

  1. curl -i -H "Content-Type: application/json" -X POST -d '{
  2. "name":"doris-avro-test",
  3. "config":{
  4. "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
  5. "topics":"avro_topic",
  6. "tasks.max":"10",
  7. "doris.topic2table.map": "avro_topic:avro_tab",
  8. "buffer.count.records":"100000",
  9. "buffer.flush.time":"120",
  10. "buffer.size.bytes":"10000000",
  11. "doris.urls":"",
  12. "doris.user":"root",
  13. "doris.password":"",
  14. "doris.http.port":"8030",
  15. "doris.query.port":"9030",
  16. "doris.database":"test",
  17. "load.model":"stream_load",
  18. "key.converter":"io.confluent.connect.avro.AvroConverter",
  19. "key.converter.schema.registry.url":"",
  20. "value.converter":"io.confluent.connect.avro.AvroConverter",
  21. "value.converter.schema.registry.url":""
  22. }
  23. }'

Load Protobuf serialized data

  1. curl -i -H "Content-Type: application/json" -X POST -d '{
  2. "name":"doris-protobuf-test",
  3. "config":{
  4. "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
  5. "topics":"proto_topic",
  6. "tasks.max":"10",
  7. "doris.topic2table.map": "proto_topic:proto_tab",
  8. "buffer.count.records":"100000",
  9. "buffer.flush.time":"120",
  10. "buffer.size.bytes":"10000000",
  11. "doris.urls":"",
  12. "doris.user":"root",
  13. "doris.password":"",
  14. "doris.http.port":"8030",
  15. "doris.query.port":"9030",
  16. "doris.database":"test",
  17. "load.model":"stream_load",
  18. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
  19. "key.converter.schema.registry.url":"",
  20. "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
  21. "value.converter.schema.registry.url":""
  22. }
  23. }'


1. The following error occurs when reading Json type data:

  1. Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
  2. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
  3. at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
  4. at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
  5. at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
  6. at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)

reason: This is because using the org.apache.kafka.connect.json.JsonConverter converter requires matching the “schema” and “payload” fields.

Two solutions, choose one:

  1. Replace org.apache.kafka.connect.json.JsonConverter with org.apache.kafka.connect.storage.StringConverter
  2. If the startup mode is Standalone mode, change value.converter.schemas.enable or key.converter.schemas.enable in config/connect-standalone.properties to false; If the startup mode is Distributed mode, change value.converter.schemas.enable or key.converter.schemas.enable in config/connect-distributed.properties to false