kafka-logger
目录
简介
kafka-logger
是一个插件,可用作ngx_lua nginx 模块的 Kafka 客户端驱动程序。
它可以将接口请求日志以 JSON 的形式推送给外部 Kafka 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。
有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 Batch-Processor
属性
名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
---|---|---|---|---|---|
broker_list | object | 必须 | 要推送的 kafka 的 broker 列表。 | ||
kafka_topic | string | 必须 | 要推送的 topic。 | ||
producer_type | string | 可选 | async | [“async”, “sync”] | 生产者发送消息的模式。 |
key | string | 可选 | 用于消息的分区分配。 | ||
timeout | integer | 可选 | 3 | [1,…] | 发送数据的超时时间。 |
name | string | 可选 | “kafka logger” | batch processor 的唯一标识。 | |
meta_format | enum | 可选 | “default” | [“default”,”origin”] | default :获取请求信息以默认的 JSON 编码方式。origin :获取请求信息以 HTTP 原始请求方式。具体示例 |
batch_max_size | integer | 可选 | 1000 | [1,…] | 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 Kafka 服务。 |
inactive_timeout | integer | 可选 | 5 | [1,…] | 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 Kafka 服务。 |
buffer_duration | integer | 可选 | 60 | [1,…] | 必须先处理批次中最旧条目的最长期限(以秒为单位)。 |
max_retry_count | integer | 可选 | 0 | [0,…] | 从处理管道中移除之前的最大重试次数。 |
retry_delay | integer | 可选 | 1 | [0,…] | 如果执行失败,则应延迟执行流程的秒数。 |
include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。 |
meta_format 参考示例
default:
{
"upstream": "127.0.0.1:1980",
"start_time": 1619414294760,
"client_ip": "127.0.0.1",
"service_id": "",
"route_id": "1",
"request": {
"querystring": {
"ab": "cd"
},
"size": 90,
"uri": "/hello?ab=cd",
"url": "http://localhost:1984/hello?ab=cd",
"headers": {
"host": "localhost",
"content-length": "6",
"connection": "close"
},
"body": "abcdef",
"method": "GET"
},
"response": {
"headers": {
"connection": "close",
"content-type": "text/plain; charset=utf-8",
"date": "Mon, 26 Apr 2021 05:18:14 GMT",
"server": "APISIX/2.5",
"transfer-encoding": "chunked"
},
"size": 190,
"status": 200
},
"server": {
"hostname": "localhost",
"version": "2.5"
},
"latency": 0
}
origin:
``` GET /hello?ab=cd HTTP/1.1 host: localhost content-length: 6 connection: close
abcdef
```
工作原理
消息将首先写入缓冲区。 当缓冲区超过batch_max_size
时,它将发送到 kafka 服务器, 或每个buffer_duration
刷新缓冲区。
如果成功,则返回 true
。 如果出现错误,则返回 nil
,并带有描述错误的字符串(buffer overflow
)。
Broker 列表
插件支持一次推送到多个 Broker,如下配置:
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
如何启用
- 为特定路由启用 kafka-logger 插件。
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
测试插件
成功
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
插件元数据设置
名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
---|---|---|---|---|---|
log_format | object | 可选 | {“host”: “$host”, “@timestamp”: “$time_iso8601”, “client_ip”: “$remote_addr”} | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 $ 开头,则表明是要获取 APISIX 变量或 Nginx 内置变量。特别的,该设置是全局生效的,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 |
APISIX 变量
变量名 | 描述 | 使用示例 |
---|---|---|
route_id | route 的 id | $route_id |
route_name | route 的 name | $route_name |
service_id | service 的 id | $service_id |
service_name | service 的 name | $service_name |
consumer_name | consumer 的 username | $consumer_name |
设置日志格式示例
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
在日志收集处,将得到类似下面的日志:
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
禁用插件
当您要禁用kafka-logger
插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'