kafka

使用sink kafka将日志数据发送至下游Kafka。

Example

  1. sink:
  2. type: kafka
  3. brokers: ["127.0.0.1:6400"]
  4. topic: "log-${fields.topic}"

brokers

字段类型是否必填默认值含义
brokersstring数组必填发送日志至Kafka的brokers地址

topic

字段类型是否必填默认值含义
topicstring非必填loggie发送日志至Kafka的topic

可使用${a.b}的方式,获取event里的字段值作为具体的topic名称。

比如,一个event为:

  1. {
  2. "topic": "loggie",
  3. "hello": "world"
  4. }

可配置topic: ${topic},此时该event发送到Kafka的topic为”loggie”。

同时支持嵌套的选择方式:

  1. {
  2. "fields": {
  3. "topic": "loggie"
  4. },
  5. "hello": "world"
  6. }

可配置topic: ${fields.topic},同样也会发送到topic “loggie”。

balance

字段类型是否必填默认值含义
balancestring非必填roundRobin负载均衡策略,可填hashroundRobinleastBytes

compression

字段类型是否必填默认值含义
compressionstring非必填gzip日志发送至Kafka的压缩策略,可填gzipsnappylz4zstd

maxAttempts

字段类型是否必填默认值含义
maxAttemptsint非必填10发送最多重试次数

batchSize

字段类型是否必填默认值含义
batchSizeint非必填100发送时每个batch最多包含的数据个数

batchBytes

字段类型是否必填默认值含义
batchBytesint非必填1048576每个发送请求包含的最大字节数

batchTimeout

字段类型是否必填默认值含义
batchTimeouttime.Duration非必填1s形成每个发送batch的最长时间

readTimeout

字段类型是否必填默认值含义
readTimeouttime.Duration非必填10s读取超时时间

writeTimeout

字段类型是否必填默认值含义
writeTimeouttime.Duration非必填10s写入超时时间

requiredAcks

字段类型是否必填默认值含义
requiredAcksint非必填0等待ack参数,可为01-1
  • 0: 不要求ack
  • 1: 等待leader partition ack
  • -1: 等待ISR中所有replica ack

sasl

字段类型是否必填默认值含义
sasl非必填SASL authentication
sasl.typestring必填SASL类型,可为:plainscram
sasl.userNamestring必填用户名
sasl.passwordstring必填密码
sasl.algorithmstringtype=scram时必填type=scram时使用的算法,可选sha256sha512