简介

kafka-logger 是一个插件,可用作ngx_lua nginx 模块的 Kafka 客户端驱动程序。

它可以将接口请求日志以 JSON 的形式推送给外部 Kafka 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。

有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 Batch-Processor

属性

名称 类型 必选项 默认值 有效值 描述
broker_list object 必须 要推送的 kafka 的 broker 列表。
kafka_topic string 必须 要推送的 topic。
key string 可选 用于消息的分区分配。
timeout integer 可选 3 [1,…] 发送数据的超时时间。
name string 可选 “kafka logger” batch processor 的唯一标识。
meta_format enum 可选 “default” [“default”,”origin”] default:获取请求信息以默认的 JSON 编码方式。origin:获取请求信息以 HTTP 原始请求方式。具体示例
batch_max_size integer 可选 1000 [1,…] 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 Kafka 服务。
inactive_timeout integer 可选 5 [1,…] 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 Kafka 服务。
buffer_duration integer 可选 60 [1,…] 必须先处理批次中最旧条目的最长期限(以秒为单位)。
max_retry_count integer 可选 0 [0,…] 从处理管道中移除之前的最大重试次数。
retry_delay integer 可选 1 [0,…] 如果执行失败,则应延迟执行流程的秒数。
include_req_body boolean 可选 false [false, true] 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。

meta_format 参考示例

  • default:

    1. {"upstream":"127.0.0.1:1980","start_time":1602211788041,"client_ip":"127.0.0.1","service_id":"","route_id":"1","request":{"querystring":{"ab":"cd"},"size":90,"uri":"\/hello?ab=cd","url":"http:\/\/localhost:1984\/hello?ab=cd","headers":{"host":"localhost","content-length":"6","connection":"close"},"body":"abcdef","method":"GET"},"response":{"headers":{"content-type":"text\/plain","server":"APISIX\/1.5","connection":"close","transfer-encoding":"chunked"},"status":200,"size":153},"latency":99.000215530396}
  • origin:

    1. GET /hello?ab=cd HTTP/1.1
    2. host: localhost
    3. content-length: 6
    4. connection: close
    5. abcdef

工作原理

消息将首先写入缓冲区。 当缓冲区超过batch_max_size时,它将发送到 kafka 服务器, 或每个buffer_duration刷新缓冲区。

如果成功,则返回 true。 如果出现错误,则返回 nil,并带有描述错误的字符串(buffer overflow)。

Broker 列表

插件支持一次推送到多个 Broker,如下配置:

  1. {
  2. "127.0.0.1":9092,
  3. "127.0.0.1":9093
  4. }

如何启用

  1. 为特定路由启用 kafka-logger 插件。
  1. curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
  2. {
  3. "plugins": {
  4. "kafka-logger": {
  5. "broker_list" :
  6. {
  7. "127.0.0.1":9092
  8. },
  9. "kafka_topic" : "test2",
  10. "key" : "key1"
  11. }
  12. },
  13. "upstream": {
  14. "nodes": {
  15. "127.0.0.1:1980": 1
  16. },
  17. "type": "roundrobin"
  18. },
  19. "uri": "/hello"
  20. }'

测试插件

成功

  1. $ curl -i http://127.0.0.1:9080/hello
  2. HTTP/1.1 200 OK
  3. ...
  4. hello, world

禁用插件

当您要禁用kafka-logger插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:

  1. $ curl http://127.0.0.1:2379/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d value='
  2. {
  3. "methods": ["GET"],
  4. "uri": "/hello",
  5. "plugins": {},
  6. "upstream": {
  7. "type": "roundrobin",
  8. "nodes": {
  9. "127.0.0.1:1980": 1
  10. }
  11. }
  12. }'