Kafka Connect 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具,可以定义 Connectors 将大量数据迁入迁出 Kafka。

Doris 提供了 Sink Connector 插件,可以将 Kafka topic 中的数据写入到 Doris 中。

Doris Kafka Connector 使用

Standalone 模式启动

配置 connect-standalone.properties

  1. # 修改 broker 地址
  2. bootstrap.servers=127.0.0.1:9092

配置 doris-connector-sink.properties 在 config 目录下创建 doris-connector-sink.properties,并配置如下内容:

  1. name=test-doris-sink
  2. connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
  3. topics=topic_test
  4. doris.topic2table.map=topic_test:test_kafka_tbl
  5. buffer.count.records=10000
  6. buffer.flush.time=120
  7. buffer.size.bytes=5000000
  8. doris.urls=10.10.10.1
  9. doris.http.port=8030
  10. doris.query.port=9030
  11. doris.user=root
  12. doris.password=
  13. doris.database=test_db
  14. key.converter=org.apache.kafka.connect.storage.StringConverter
  15. value.converter=org.apache.kafka.connect.json.JsonConverter
  16. key.converter.schemas.enable=false
  17. value.converter.schemas.enable=false

启动 Standalone

  1. $KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/doris-connector-sink.properties

Doris Kafka Connector - 图1危险

注意:一般不建议在生产环境中使用 standalone 模式

Distributed 模式启动

配置 connect-distributed.properties

  1. # 修改 broker 地址
  2. bootstrap.servers=127.0.0.1:9092
  3. # 修改 group.id,同一集群的需要一致
  4. group.id=connect-cluster

启动 Distributed

  1. $KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

增加 Connector

  1. curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  2. "name":"test-doris-sink-cluster",
  3. "config":{
  4. "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
  5. "topics":"topic_test",
  6. "doris.topic2table.map": "topic_test:test_kafka_tbl",
  7. "buffer.count.records":"10000",
  8. "buffer.flush.time":"120",
  9. "buffer.size.bytes":"5000000",
  10. "doris.urls":"10.10.10.1",
  11. "doris.user":"root",
  12. "doris.password":"",
  13. "doris.http.port":"8030",
  14. "doris.query.port":"9030",
  15. "doris.database":"test_db",
  16. "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  17. "value.converter":"org.apache.kafka.connect.json.JsonConverter",
  18. "key.converter.schemas.enable":"false",
  19. "value.converter.schemas.enable":"false",
  20. }
  21. }'

操作 Connector

  1. # 查看 connector 状态
  2. curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
  3. # 删除当前 connector
  4. curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
  5. # 暂停当前 connector
  6. curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
  7. # 重启当前 connector
  8. curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
  9. # 重启 connector 内的 tasks
  10. curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

参考:Connect REST Interface

Doris Kafka Connector - 图2危险

注意 kafka-connect 首次启动时,会往 kafka 集群中创建 config.storage.topic offset.storage.topic status.storage.topic 三个 topic 用于记录 kafka-connect 的共享连接器配置、偏移数据和状态更新。How to Use Kafka Connect - Get Started

访问 SSL 认证的 Kafka 集群

通过 kafka-connect 访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(client.truststore.jks)。您可以在 connect-distributed.properties 文件中增加以下配置:

  1. # Connect worker
  2. security.protocol=SSL
  3. ssl.truststore.location=/var/ssl/private/client.truststore.jks
  4. ssl.truststore.password=test1234
  5. # Embedded consumer for sink connectors
  6. consumer.security.protocol=SSL
  7. consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
  8. consumer.ssl.truststore.password=test1234

关于通过 kafka-connect 连接 SSL 认证的 Kafka 集群配置说明可以参考:Configure Kafka Connect

死信队列

默认情况下,转换过程中或转换过程中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过它们来容忍此类错误,可选择将每个错误和失败操作的详细信息以及有问题的记录(具有不同级别的详细信息)写入死信队列以便记录。

  1. errors.tolerance=all
  2. errors.deadletterqueue.topic.name=test_error_topic
  3. errors.deadletterqueue.context.headers.enable=true
  4. errors.deadletterqueue.topic.replication.factor=1

配置项

KeyDefault ValueRequiredDescription
name-YConnect 应用名称,必须是在 Kafka Connect 环境中唯一
connector.class-Yorg.apache.doris.kafka.connector.DorisSinkConnector
topics-Y订阅的 topic 列表,逗号分隔: topic1,topic2
doris.urls-YDoris FE 连接地址。如果有多个,中间用逗号分割: 10.20.30.1,10.20.30.2,10.20.30.3
doris.http.port-YDoris HTTP 协议端口
doris.query.port-YDoris MySQL 协议端口
doris.user-YDoris 用户名
doris.password-YDoris 密码
doris.database-Y要写入的数据库。多个库时可以为空,同时在 topic2table.map 需要配置具体的库名称
doris.topic2table.map-Ntopic 和 table 表的对应关系,例:topic1:tb1,topic2:tb2
默认为空,表示 topic 和 table 名称一一对应。
多个库的格式为 topic1:db1.tbl1,topic2:db2.tbl2
buffer.count.records10000N在 flush 到 doris 之前,每个 Kafka 分区在内存中缓冲的记录数。默认 10000 条记录
buffer.flush.time120Nbuffer 刷新间隔,单位秒,默认120秒
buffer.size.bytes5000000(5MB)N每个 Kafka 分区在内存中缓冲的记录的累积大小,单位字节,默认5MB
jmxtrueN通过 JMX 获取 Connector 内部监控指标, 请参考: Doris-Connector-JMX
enable.deletefalseN是否同步删除记录, 默认 false
label.prefix${name}NStream load 导入数据时的 label 前缀。默认为 Connector 应用名称。
auto.redirecttrueN是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 重定向到需要写入数据的 BE,并且不再显示获取 BE 信息
load.modelstream_loadN导入数据的方式。支持 stream_load 直接数据导入到 Doris 中;同时支持 copy_into 的方式导入数据至对象存储中,然后将数据加载至 Doris 中
sink.properties.*‘sink.properties.format’:’json’,
‘sink.properties.read_json_by_line’:’true’
NStream Load 的导入参数。
例如: 定义列分隔符‘sink.properties.column_separator’:’,’
详细参数参考这里
delivery.guaranteeat_least_onceN消费 Kafka 数据导入至 doris 时,数据一致性的保障方式。 支持 at_least_once exactly_once,默认为 at_least_once 。Doris 需要升级至 2.1.0 以上,才能保障数据的 exactly_once

其他Kafka Connect Sink通用配置项可参考:Kafka Connect Sink Configuration Properties