批处理器

批处理器可用于聚合条目(日志/任何数据)并进行批处理。 当 batch_max_size 设置为 1 时,处理器将立即执行每个条目。将批处理的最大值设置为大于 1 将开始聚合条目,直到达到最大值或超时。

配置

创建批处理器的唯一必需参数是函数。当批处理达到最大值或缓冲区持续时间超过时,函数将被执行。

名称类型必选项默认值有效值描述
namestring可选xxx logger[“http logger”, “Some strings”,…]用于标识批处理器的唯一标识符,默认为调用批处理器的日志插件名字,如配置插件为 http logger,name 默认为 http logger。
batch_max_sizeinteger可选1000[1,…]设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 HTTP/HTTPS 服务。
inactive_timeoutinteger可选5[1,…]刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 HTTP/HTTPS 服务。
buffer_durationinteger可选60[1,…]必须先处理批次中最旧条目的最长期限(以秒为单位)。
max_retry_countinteger可选0[0,…]从处理管道中移除之前的最大重试次数。
retry_delayinteger可选1[0,…]如果执行失败,则应延迟执行流程的秒数。

以下代码显示了如何在你的插件中使用批处理器:

  1. local bp_manager_mod = require("apisix.utils.batch-processor-manager")
  2. ...
  3. local plugin_name = "xxx-logger"
  4. local batch_processor_manager = bp_manager_mod.new(plugin_name)
  5. local schema = {...}
  6. local _M = {
  7. ...
  8. name = plugin_name,
  9. schema = batch_processor_manager:wrap_schema(schema),
  10. }
  11. ...
  12. function _M.log(conf, ctx)
  13. local entry = {...} -- data to log
  14. if batch_processor_manager:add_entry(conf, entry) then
  15. return
  16. end
  17. -- create a new processor if not found
  18. -- entries is an array table of entry, which can be processed in batch
  19. local func = function(entries)
  20. -- serialize to json array core.json.encode(entries)
  21. -- process/send data
  22. return true
  23. -- return false, err_msg, first_fail if failed
  24. -- first_fail(optional) indicates first_fail-1 entries have been successfully processed
  25. -- and during processing of entries[first_fail], the error occurred. So the batch processor
  26. -- only retries for the entries having index >= first_fail as per the retry policy.
  27. end
  28. batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
  29. end

批处理器的配置将通过该插件的配置设置。 举个例子:

批处理器 - 图1note

您可以这样从 config.yaml 中获取 admin_key 并存入环境变量:

  1. admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')
  1. curl http://127.0.0.1:9180/apisix/admin/routes/1 -H "X-API-KEY: $admin_key" -X PUT -d '
  2. {
  3. "plugins": {
  4. "http-logger": {
  5. "uri": "http://mockbin.org/bin/:ID",
  6. "batch_max_size": 10,
  7. "max_retry_count": 1
  8. }
  9. },
  10. "upstream": {
  11. "type": "roundrobin",
  12. "nodes": {
  13. "127.0.0.1:1980": 1
  14. }
  15. },
  16. "uri": "/hello"
  17. }'

如果你的插件只使用一个全局的批处理器, 你可以直接使用它:

  1. local entry = {...} -- data to log
  2. if log_buffer then
  3. log_buffer:push(entry)
  4. return
  5. end
  6. local config_bat = {
  7. name = config.name,
  8. retry_delay = config.retry_delay,
  9. ...
  10. }
  11. local err
  12. -- entries is an array table of entry, which can be processed in batch
  13. local func = function(entries)
  14. ...
  15. return true
  16. -- return false, err_msg, first_fail if failed
  17. end
  18. log_buffer, err = batch_processor:new(func, config_bat)
  19. if not log_buffer then
  20. core.log.warn("error when creating the batch processor: ", err)
  21. return
  22. end
  23. log_buffer:push(entry)

注意:请确保批处理的最大值(条目数)在函数执行的范围内。 刷新批处理的计时器基于 inactive_timeout 配置运行。因此,为了获得最佳使用效果, 保持 inactive_timeout 小于 buffer_duration