Version: 2.11
批处理器
批处理器可用于聚合条目(日志/任何数据)并进行批处理。 当 batch_max_size
设置为零时,处理器将立即执行每个条目。将批处理的最大值设置为大于 1 将开始聚合条目,直到达到最大值或超时。
配置
创建批处理器的唯一必需参数是函数。当批处理达到最大值或缓冲区持续时间超过时,函数将被执行。
名称 | 必选项 | 描述 |
---|---|---|
name | 可选的 | 标识批处理者的唯一标识符 |
batch_max_size | 可选的 | 每批的最大大小,默认为 1000 |
inactive_timeout | 可选的 | 如果不活动,将刷新缓冲区的最大时间(以秒为单位),默认值为 5 |
buffer_duration | 可选的 | 必须先处理批次中最旧条目的最大期限(以秒为单位),默认是 5 |
max_retry_count | 可选的 | 从处理管道中移除之前的最大重试次数;默认为 0 |
retry_delay | 可选的 | 如果执行失败,应该延迟进程执行的秒数;默认为 1 |
以下代码显示了如何使用批处理程序的示例。批处理器将一个要执行的函数作为第一个参数,将批处理配置作为第二个参数。
local bp = require("apisix.utils.batch-processor")
local func_to_execute = function(entries)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
end
local config = {
max_retry_count = 2,
buffer_duration = 60,
inactive_timeout = 5,
batch_max_size = 1,
retry_delay = 0
}
local batch_processor, err = bp:new(func_to_execute, config)
if batch_processor then
batch_processor:push({hello='world'})
end
注意:请确保批处理的最大值(条目数)在函数执行的范围内。 刷新批处理的计时器基于 inactive_timeout
配置运行。因此,为了获得最佳使用效果, 保持 inactive_timeout
小于 buffer_duration
。