kafka-logger

Description

The kafka-logger Plugin is used to push logs as JSON objects to Apache Kafka clusters. It works as a Kafka client driver for the ngx_lua Nginx module.

It might take some time to receive the log data. It will be automatically sent after the timer function in the batch processor expires.

Attributes

NameTypeRequiredDefaultValid valuesDescription
broker_listobjectTrueDeprecated, use brokers instead. List of Kafka brokers. (nodes).
brokersarrayTrueList of Kafka brokers (nodes).
brokers.hoststringTrueThe host of Kafka broker, e.g, 192.168.1.1.
brokers.portintegerTrue[0, 65535]The port of Kafka broker
brokers.sasl_configobjectFalseThe sasl config of Kafka broker
brokers.sasl_config.mechanismstringFalse“PLAIN”[“PLAIN”]The mechaism of sasl config
brokers.sasl_config.userstringTrueThe user of sasl_config. If sasl_config exists, it’s required.
brokers.sasl_config.passwordstringTrueThe password of sasl_config. If sasl_config exists, it’s required.
kafka_topicstringTrueTarget topic to push the logs for organisation.
producer_typestringFalseasync[“async”, “sync”]Message sending mode of the producer.
required_acksintegerFalse1[1, -1]Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka acks attribute. required_acks cannot be 0. See Apache Kafka documentation for more.
keystringFalseKey used for allocating partitions for messages.
timeoutintegerFalse3[1,…]Timeout for the upstream to send data.
namestringFalse“kafka logger”Unique identifier for the batch processor. If you use Prometheus to monitor APISIX metrics, the name is exported in apisix_batch_process_entries.
meta_formatenumFalse“default”[“default”,”origin”]Format to collect the request information. Setting to default collects the information in JSON format and origin collects the information with the original HTTP request. See examples below.
log_formatobjectFalseLog format declared as key value pairs in JSON format. Values only support strings. APISIX or Nginx variables can be used by prefixing the string with $.
include_req_bodybooleanFalsefalse[false, true]When set to true includes the request body in the log. If the request body is too big to be kept in the memory, it can’t be logged due to Nginx’s limitations.
include_req_body_exprarrayFalseFilter for when the include_req_body attribute is set to true. Request body is only logged when the expression set here evaluates to true. See lua-resty-expr for more.
max_req_body_bytesintegerFalse524288>=1Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka.
include_resp_bodybooleanFalsefalse[false, true]When set to true includes the response body in the log.
include_resp_body_exprarrayFalseFilter for when the include_resp_body attribute is set to true. Response body is only logged when the expression set here evaluates to true. See lua-resty-expr for more.
max_resp_body_bytesintegerFalse524288>=1Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka.
cluster_nameintegerFalse1[0,…]Name of the cluster. Used when there are two or more Kafka clusters. Only works if the producer_type attribute is set to async.
producer_batch_numintegeroptional200[1,…]batch_num parameter in lua-resty-kafka. The merge message and batch is send to the server. Unit is message count.
producer_batch_sizeintegeroptional1048576[0,…]batch_size parameter in lua-resty-kafka in bytes.
producer_max_bufferingintegeroptional50000[1,…]max_buffering parameter in lua-resty-kafka representing maximum buffer size. Unit is message count.
producer_time_lingerintegeroptional1[1,…]flush_time parameter in lua-resty-kafka in seconds.
meta_refresh_intervalintegeroptional30[1,…]refresh_interval parameter in lua-resty-kafka specifies the time to auto refresh the metadata, in seconds.

This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every 5 seconds or when the data in the queue reaches 1000. See Batch Processor for more information or setting your custom configuration.

kafka-logger - 图1IMPORTANT

The data is first written to a buffer. When the buffer exceeds the batch_max_size or buffer_duration attribute, the data is sent to the Kafka server and the buffer is flushed.

If the process is successful, it will return true and if it fails, returns nil with a string with the “buffer overflow” error.

meta_format example

  • default:

    1. {
    2. "upstream": "127.0.0.1:1980",
    3. "start_time": 1619414294760,
    4. "client_ip": "127.0.0.1",
    5. "service_id": "",
    6. "route_id": "1",
    7. "request": {
    8. "querystring": {
    9. "ab": "cd"
    10. },
    11. "size": 90,
    12. "uri": "/hello?ab=cd",
    13. "url": "http://localhost:1984/hello?ab=cd",
    14. "headers": {
    15. "host": "localhost",
    16. "content-length": "6",
    17. "connection": "close"
    18. },
    19. "body": "abcdef",
    20. "method": "GET"
    21. },
    22. "response": {
    23. "headers": {
    24. "connection": "close",
    25. "content-type": "text/plain; charset=utf-8",
    26. "date": "Mon, 26 Apr 2021 05:18:14 GMT",
    27. "server": "APISIX/2.5",
    28. "transfer-encoding": "chunked"
    29. },
    30. "size": 190,
    31. "status": 200
    32. },
    33. "server": {
    34. "hostname": "localhost",
    35. "version": "2.5"
    36. },
    37. "latency": 0
    38. }
  • origin:

    1. GET /hello?ab=cd HTTP/1.1
    2. host: localhost
    3. content-length: 6
    4. connection: close
    5. abcdef

Metadata

You can also set the format of the logs by configuring the Plugin metadata. The following configurations are available:

NameTypeRequiredDefaultDescription
log_formatobjectFalseLog format declared as key value pairs in JSON format. Values only support strings. APISIX or Nginx variables can be used by prefixing the string with $.
kafka-logger - 图2IMPORTANT

Configuring the Plugin metadata is global in scope. This means that it will take effect on all Routes and Services which use the kafka-logger Plugin.

The example below shows how you can configure through the Admin API:

kafka-logger - 图3note

You can fetch the admin_key from config.yaml and save to an environment variable with the following command:

  1. admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')
  1. curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger -H "X-API-KEY: $admin_key" -X PUT -d '
  2. {
  3. "log_format": {
  4. "host": "$host",
  5. "@timestamp": "$time_iso8601",
  6. "client_ip": "$remote_addr"
  7. }
  8. }'

With this configuration, your logs would be formatted as shown below:

  1. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
  2. {"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

Enable Plugin

The example below shows how you can enable the kafka-logger Plugin on a specific Route:

  1. curl http://127.0.0.1:9180/apisix/admin/routes/5 -H "X-API-KEY: $admin_key" -X PUT -d '
  2. {
  3. "plugins": {
  4. "kafka-logger": {
  5. "brokers" : [
  6. {
  7. "host" :"127.0.0.1",
  8. "port" : 9092
  9. }
  10. ],
  11. "kafka_topic" : "test2",
  12. "key" : "key1",
  13. "batch_max_size": 1,
  14. "name": "kafka logger"
  15. }
  16. },
  17. "upstream": {
  18. "nodes": {
  19. "127.0.0.1:1980": 1
  20. },
  21. "type": "roundrobin"
  22. },
  23. "uri": "/hello"
  24. }'

This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:

  1. "brokers" : [
  2. {
  3. "host" :"127.0.0.1",
  4. "port" : 9092
  5. },
  6. {
  7. "host" :"127.0.0.1",
  8. "port" : 9093
  9. }
  10. ],

Example usage

Now, if you make a request to APISIX, it will be logged in your Kafka server:

  1. curl -i http://127.0.0.1:9080/hello

Delete Plugin

To remove the kafka-logger Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect.

  1. curl http://127.0.0.1:9180/apisix/admin/routes/1 -H "X-API-KEY: $admin_key" -X PUT -d '
  2. {
  3. "methods": ["GET"],
  4. "uri": "/hello",
  5. "plugins": {},
  6. "upstream": {
  7. "type": "roundrobin",
  8. "nodes": {
  9. "127.0.0.1:1980": 1
  10. }
  11. }
  12. }'