franz kafka

使用franz-go kafka库将日志数据发送至下游Kafka,并且能比较好的支持kerberos认证。
(本sink和kafka sink的区别一般只在于使用的kafka golang库不同,提供给对franz kafka库有偏好的用户使用)

Example

  1. sink:
  2. type: franzKafka
  3. brokers: ["127.0.0.1:6400"]
  4. topic: "log-${fields.topic}"

brokers

字段类型是否必填默认值含义
brokersstring数组必填发送日志至Kafka的brokers地址

topic

字段类型是否必填默认值含义
topicstring非必填loggie发送日志至Kafka的topic

可使用${a.b}的方式,获取event里的字段值作为具体的topic名称。

比如,一个event为:

  1. {
  2. "topic": "loggie",
  3. "hello": "world"
  4. }

可配置topic: ${topic},此时该event发送到Kafka的topic为”loggie”。

同时支持嵌套的选择方式:

  1. {
  2. "fields": {
  3. "topic": "loggie"
  4. },
  5. "hello": "world"
  6. }

可配置topic: ${fields.topic},同样也会发送到topic “loggie”。

balance

字段类型是否必填默认值含义
balancestring非必填roundRobin负载均衡策略,可填roundRobinrangestickycooperativeSticky

compression

字段类型是否必填默认值含义
compressionstring非必填gzip日志发送至Kafka的压缩策略,可填gzipsnappylz4zstd

batchSize

字段类型是否必填默认值含义
batchSizeint非必填100发送时每个batch最多包含的数据个数

batchBytes

字段类型是否必填默认值含义
batchBytesint非必填1048576每个发送请求包含的最大字节数

writeTimeout

字段类型是否必填默认值含义
writeTimeouttime.Duration非必填10s写入超时时间

tls

字段类型是否必填默认值含义
tls.enabledbool非必填false是否启用
tls.caCertFilesstring非必填证书文件路径
tls.clientCertFilestring必填SASL类型,可为:客户端cert文件
tls.clientKeyFilestring必填SASL类型,可为:客户端key文件
tls.endpIdentAlgobooltype=scram时必填客户端是否验证服务端的证书名字

sasl

字段类型是否必填默认值含义
tls.enabledbool非必填false是否启用
sasl非必填SASL authentication
sasl.mechanismstring必填SASL类型,可为:PLAINSCRAM-SHA-256SCRAM-SHA-512GSSAPI
sasl.userNamestring必填用户名
sasl.passwordstring必填密码

gssapi

字段类型是否必填默认值含义
sasl.gssapi非必填SASL authentication
sasl.gssapi.authTypestring必填SASL类型,可为:1 使用账号密码、2 使用keytab
sasl.gssapi.keyTabPathstring必填keytab 文件路径
sasl.gssapi.kerberosConfigPathstring必填kerbeos 文件路径
sasl.gssapi.serviceNamestring必填服务名称
sasl.gssapi.userNamestring必填用户名
sasl.gssapi.passwordstring必填密码
sasl.gssapi.realmstring必填领域
sasl.gssapi.disablePAFXFASTbooltype=scram时必填DisablePAFXFAST 用于将客户端配置为不使用 PA_FX_FAST

security

字段类型是否必填默认值含义
securitystring非必填java格式的安全认证内容,可以自动转化成为franz-go适配的格式

案例:

  1. pipelines:
  2. - name: local
  3. sources:
  4. - type: file
  5. name: demo
  6. paths:
  7. - /tmp/log/*.log
  8. sink:
  9. type: franzKafka
  10. topic: loggie
  11. writeTimeout: 5s
  12. SASL:
  13. gssapi:
  14. kerberosConfigPath: /etc/krb5-conf/krb5.conf
  15. security:
  16. security.protocol: "SASL_PLAINTEXT"
  17. sasl.mechanism: "GSSAPI"
  18. sasl.jaas.config: "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab=\"/shylock/kerberos/zork.keytab\" principal=\"zork@AXRZPT.COM\";"
  19. sasl.kerberos.service.name: "kafka"
  20. brokers:
  21. - "hadoop74.axrzpt.com:9092"

Kubernetes 挂载keytab二进制证书,请参考官方文档