kafka-logger

描述

kafka-logger 插件用于将日志作为 JSON 对象推送到 Apache Kafka 集群中。可用作 ngx_lua NGINX 模块的 Kafka 客户端驱动程序。

属性

名称类型必选项默认值有效值描述
broker_listobject已废弃,现使用 brokers 属性代替。原指需要推送的 Kafka 的 broker 列表。
brokersarray需要推送的 Kafka 的 broker 列表。
brokers.hoststringKafka broker 的节点 host 配置,例如 192.168.1.1
brokers.portstringKafka broker 的节点端口配置
brokers.sasl_configobjectKafka broker 中的 sasl_config
brokers.sasl_config.mechanismstring“PLAIN”[“PLAIN”]Kafka broker 中的 sasl 认证机制
brokers.sasl_config.userstringKafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写
brokers.sasl_config.passwordstringKafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写
kafka_topicstring需要推送的 topic。
producer_typestringasync[“async”, “sync”]生产者发送消息的模式。
required_acksinteger1[1, -1]生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka acks 属性相同,具体配置请参考 Apache Kafka 文档。required_acks 还不支持为 0。
keystring用于消息分区而分配的密钥。
timeoutinteger3[1,…]发送数据的超时时间。
namestring“kafka logger”标识 logger 的唯一标识符。如果您使用 Prometheus 监视 APISIX 指标,名称将以 apisix_batch_process_entries 导出。
meta_formatenum“default”[“default”,”origin”]default:获取请求信息以默认的 JSON 编码方式。origin:获取请求信息以 HTTP 原始请求方式。更多信息,请参考 meta_format
log_formatobject以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量NGINX 内置变量
include_req_bodybooleanfalse[false, true]当设置为 true 时,包含请求体。注意:如果请求体无法完全存放在内存中,由于 NGINX 的限制,APISIX 无法将它记录下来。
include_req_body_exprarrayinclude_req_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时,才会记录请求体。更多信息,请参考 lua-resty-expr
include_resp_bodybooleanfalse[false, true]当设置为 true 时,包含响应体。
include_resp_body_exprarrayinclude_resp_body 属性设置为 true 时进行过滤。只有当此处设置的表达式计算结果为 true 时才会记录响应体。更多信息,请参考 lua-resty-expr
cluster_nameinteger1[0,…]Kafka 集群的名称,当有两个及以上 Kafka 集群时使用。只有当 producer_type 设为 async 模式时才可以使用该属性。
producer_batch_numinteger200[1,…]对应 lua-resty-kafka 中的 batch_num 参数,聚合消息批量提交,单位为消息条数。
producer_batch_sizeinteger1048576[0,…]对应 lua-resty-kafka 中的 batch_size 参数,单位为字节。
producer_max_bufferinginteger50000[1,…]对应 lua-resty-kafka 中的 max_buffering 参数,表示最大缓冲区,单位为条。
producer_time_lingerinteger1[1,…]对应 lua-resty-kafka 中的 flush_time 参数,单位为秒。
meta_refresh_intervalinteger30[1,…]对应 lua-resty-kafka 中的 refresh_interval 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。

该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 5 秒钟或队列中的数据达到 1000 条时提交数据,如需了解批处理器相关参数设置,请参考 Batch-Processor 配置部分。

kafka-logger - 图1提示

数据首先写入缓冲区。当缓冲区超过 batch_max_sizebuffer_duration 设置的值时,则会将数据发送到 Kafka 服务器并刷新缓冲区。

如果发送成功,则返回 true。如果出现错误,则返回 nil,并带有描述错误的字符串 buffer overflow

meta_format 示例

  • default:

    1. {
    2. "upstream": "127.0.0.1:1980",
    3. "start_time": 1619414294760,
    4. "client_ip": "127.0.0.1",
    5. "service_id": "",
    6. "route_id": "1",
    7. "request": {
    8. "querystring": {
    9. "ab": "cd"
    10. },
    11. "size": 90,
    12. "uri": "/hello?ab=cd",
    13. "url": "http://localhost:1984/hello?ab=cd",
    14. "headers": {
    15. "host": "localhost",
    16. "content-length": "6",
    17. "connection": "close"
    18. },
    19. "body": "abcdef",
    20. "method": "GET"
    21. },
    22. "response": {
    23. "headers": {
    24. "connection": "close",
    25. "content-type": "text/plain; charset=utf-8",
    26. "date": "Mon, 26 Apr 2021 05:18:14 GMT",
    27. "server": "APISIX/2.5",
    28. "transfer-encoding": "chunked"
    29. },
    30. "size": 190,
    31. "status": 200
    32. },
    33. "server": {
    34. "hostname": "localhost",
    35. "version": "2.5"
    36. },
    37. "latency": 0
    38. }
  • origin:

    1. GET /hello?ab=cd HTTP/1.1
    2. host: localhost
    3. content-length: 6
    4. connection: close
    5. abcdef

插件元数据

名称类型必选项默认值描述
log_formatobject以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量NGINX 内置变量
kafka-logger - 图2注意

该设置全局生效。如果指定了 log_format,则所有绑定 kafka-logger 的路由或服务都将使用该日志格式。

以下示例展示了如何通过 Admin API 配置插件元数据:

kafka-logger - 图3note

您可以这样从 config.yaml 中获取 admin_key 并存入环境变量:

  1. admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')
  1. curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger \
  2. -H "X-API-KEY: $admin_key" -X PUT -d '
  3. {
  4. "log_format": {
  5. "host": "$host",
  6. "@timestamp": "$time_iso8601",
  7. "client_ip": "$remote_addr"
  8. }
  9. }'

配置完成后,你将在日志系统中看到如下类似日志:

  1. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
  2. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

如何启用

你可以通过如下命令在指定路由上启用 kafka-logger 插件:

  1. curl http://127.0.0.1:9180/apisix/admin/routes/1 \
  2. -H "X-API-KEY: $admin_key" -X PUT -d '
  3. {
  4. "plugins": {
  5. "kafka-logger": {
  6. "brokers" : [
  7. {
  8. "host": "127.0.0.1",
  9. "port": 9092
  10. }
  11. ],
  12. "kafka_topic" : "test2",
  13. "key" : "key1"
  14. }
  15. },
  16. "upstream": {
  17. "nodes": {
  18. "127.0.0.1:1980": 1
  19. },
  20. "type": "roundrobin"
  21. },
  22. "uri": "/hello"
  23. }'

该插件还支持一次推送到多个 Broker,示例如下:

  1. "brokers" : [
  2. {
  3. "host" :"127.0.0.1",
  4. "port" : 9092
  5. },
  6. {
  7. "host" :"127.0.0.1",
  8. "port" : 9093
  9. }
  10. ],

测试插件

你可以通过以下命令向 APISIX 发出请求:

  1. curl -i http://127.0.0.1:9080/hello

删除插件

当你需要删除该插件时,可以通过如下命令删除相应的 JSON 配置,APISIX 将会自动重新加载相关配置,无需重启服务:

  1. curl http://127.0.0.1:9180/apisix/admin/routes/1 \
  2. -H "X-API-KEY: $admin_key" -X PUT -d '
  3. {
  4. "methods": ["GET"],
  5. "uri": "/hello",
  6. "plugins": {},
  7. "upstream": {
  8. "type": "roundrobin",
  9. "nodes": {
  10. "127.0.0.1:1980": 1
  11. }
  12. }
  13. }'