Pipeline 配置

Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制, 由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json)和纯文本(text/plain)格式的日志数据作为输入。

这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。

整体结构

Pipeline 由两部分组成:Processors 和 Transform,这两部分均为数组形式。一个 Pipeline 配置可以包含多个 Processor 和多个 Transform。Transform 所描述的数据类型会决定日志数据保存到数据库时的表结构。

  • Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
  • Transform 用于对数据进行格式转换,例如将字符串类型转换为数字类型。

一个包含 Processor 和 Transform 的简单配置示例如下:

  1. processors:
  2. - urlencoding:
  3. fields:
  4. - string_field_a
  5. - string_field_b
  6. method: decode
  7. ignore_missing: true
  8. transform:
  9. - fields:
  10. - string_field_a
  11. - string_field_b
  12. type: string
  13. # 写入的数据必须包含 timestamp 字段
  14. - fields:
  15. - reqTimeSec, req_time_sec
  16. # epoch 是特殊字段类型,必须指定精度
  17. type: epoch, ms
  18. index: timestamp

Processor

Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors 字段下。 Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。 Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。

我们目前内置了以下几种 Processor:

  • date: 解析格式化的时间字符串字段,例如 2024-07-12T16:18:53.048
  • epoch: 解析数字时间戳字段,例如 1720772378893
  • dissect: 对 log 数据字段进行拆分。
  • gsub: 对 log 数据字段进行替换。
  • join: 对 log 中的 array 类型字段进行合并。
  • letter: 对 log 数据字段进行字母转换。
  • regex: 对 log 数据字段进行正则匹配。
  • urlencoding: 对 log 数据字段进行 URL 编解码。
  • csv: 对 log 数据字段进行 CSV 解析。

date

date Processor 用于解析时间字段。示例配置如下:

  1. processors:
  2. - date:
  3. fields:
  4. - time
  5. formats:
  6. - '%Y-%m-%d %H:%M:%S%.3f'
  7. ignore_missing: true
  8. timezone: 'Asia/Shanghai'

如上所示,date Processor 的配置包含以下字段:

  • fields: 需要解析的时间字段名列表。
  • formats: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。
  • timezone: 时区。使用tz_database 中的时区标识符来指定时区。默认为 UTC

epoch

epoch Processor 用于解析时间戳字段,示例配置如下:

  1. processors:
  2. - epoch:
  3. fields:
  4. - reqTimeSec
  5. resolution: millisecond
  6. ignore_missing: true

如上所示,epoch Processor 的配置包含以下字段:

  • fields: 需要解析的时间戳字段名列表。
  • resolution: 时间戳精度,支持 s, sec , second , ms, millisecond, milli, us, microsecond, micro, ns, nanosecond, nano。默认为 ms
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

dissect

dissect Processor 用于对 log 数据字段进行拆分,示例配置如下:

  1. processors:
  2. - dissect:
  3. fields:
  4. - message
  5. patterns:
  6. - '%{key1} %{key2}'
  7. ignore_missing: true
  8. append_separator: '-'

如上所示,dissect Processor 的配置包含以下字段:

  • fields: 需要拆分的字段名列表。
  • patterns: 拆分的 dissect 模式。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。
  • append_separator: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。

Dissect 模式

和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key} 组成,其中 %{key} 为一个字段名。例如:

  1. "%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6} %{*key7} %{&key8}"

Dissect 修饰符

Dissect 模式支持以下修饰符:

修饰符说明示例
+将两个或多个字段追加到一起%{+key} %{+key}
+/n按照指定的顺序将两个或多个字段追加到一起%{+key/2} %{+key/1}
->忽略右侧的任何重复字符%{key1->} %{key2->}
?忽略匹配的值%{?key}
&将输出键设置为 ,输出值设置为 &。%{*key} %{&key}

dissect 示例

例如,对于以下 log 数据:

  1. "key1 key2 key3 key4 key5 key6 key7 key8"

使用以下 Dissect 模式:

  1. "%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6} %{*key} %{&key}"

将得到以下结果:

  1. {
  2. "key1": "key1",
  3. "key2": "key2",
  4. "key3": "key3 key4",
  5. "key5": "key5",
  6. "key7": "key8"
  7. }

gsub

gsub Processor 用于对 log 数据字段进行替换,示例配置如下:

  1. processors:
  2. - gsub:
  3. fields:
  4. - message
  5. pattern: 'old'
  6. replacement: 'new'
  7. ignore_missing: true

如上所示,gsub Processor 的配置包含以下字段:

  • fields: 需要替换的字段名列表。
  • pattern: 需要替换的字符串。支持正则表达式。
  • replacement: 替换后的字符串。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

join

join Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:

  1. processors:
  2. - join:
  3. fields:
  4. - message
  5. separator: ','
  6. ignore_missing: true

如上所示,join Processor 的配置包含以下字段:

  • fields: 需要合并的字段名列表。注意,这里每行字段的值需要是 Array 类型,每行字段会单独合并自己数组内的值,所有行的字段不会合并到一起。
  • separator: 合并后的分隔符。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

join 示例

例如,对于以下 log 数据:

  1. {
  2. "message": ["a", "b", "c"]
  3. }

使用以下配置:

  1. processors:
  2. - join:
  3. fields:
  4. - message
  5. separator: ','

将得到以下结果:

  1. {
  2. "message": "a,b,c"
  3. }

letter

letter Processor 用于对 log 数据字段进行字母转换,示例配置如下:

  1. processors:
  2. - letter:
  3. fields:
  4. - message
  5. method: upper
  6. ignore_missing: true

如上所示,letter Processor 的配置包含以下字段:

  • fields: 需要转换的字段名列表。
  • method: 转换方法,支持 upper, lowercapital。默认为 lower。注意 capital 只会将第一个字母转换为大写。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

regex

regex Processor 用于对 log 数据字段进行正则匹配,示例配置如下:

  1. processors:
  2. - regex:
  3. fields:
  4. - message
  5. patterns:
  6. - ':(?<id>[0-9])'
  7. ignore_missing: true

如上所示,regex Processor 的配置包含以下字段:

  • fields: 需要匹配的字段名列表。
  • pattern: 要进行匹配的正则表达式,需要使用命名捕获组才可以从对应字段中取出对应数据。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

regex 命名捕获组的规则

regex Processor 支持使用 (?<group-name>...) 的语法来命名捕获组,最终将数据处理为这种形式:

  1. {
  2. "<field-name>_<group-name>": "<value>"
  3. }

例如 regex Processor 中 field 填写的字段名为 message,对应的内容为 "[ERROR] error message", 你可以将 pattern 设置为 \[(?<level>[A-Z]+)\] (?<content>.+), 最终数据会被处理为:

  1. {
  2. "message_level": "ERROR",
  3. "message_content": "error message"
  4. }

urlencoding

urlencoding Processor 用于对 log 数据字段进行 URL 编码,示例配置如下:

  1. processors:
  2. - urlencoding:
  3. fields:
  4. - string_field_a
  5. - string_field_b
  6. method: decode
  7. ignore_missing: true

如上所示,urlencoding Processor 的配置包含以下字段:

  • fields: 需要编码的字段名列表。
  • method: 编码方法,支持 encode, decode。默认为 encode
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

csv

csv Processor 用于对 log 数据中没有携带 header 的 CSV 类型字段解析,示例配置如下:

  1. processors:
  2. - csv:
  3. fields:
  4. - message
  5. separator: ','
  6. quote: '"'
  7. trim: true
  8. ignore_missing: true

如上所示,csv Processor 的配置包含以下字段:

  • fields: 需要解析的字段名列表。
  • separator: 分隔符。
  • quote: 引号。
  • trim: 是否去除空格。默认为 false
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

Transform

Transform 用于对 log 数据进行转换,其配置位于 YAML 文件中的 transform 字段下。

Transform 由一个或多个配置组成,每个配置包含以下字段:

  • fields: 需要转换的字段名列表。
  • type: 转换类型
  • index: 索引类型(可选)
  • on_failure: 转换失败时的处理方式(可选)
  • default: 默认值(可选)

fields 字段

每个字段名都是一个字符串,当字段名称包含 , 时,会进行字段重命名。例如,reqTimeSec, req_time_sec 表示将 reqTimeSec 字段重命名为 req_time_sec, 最终数据将被写入到 GreptimeDB 的 req_time_sec 列。

type 字段

GreptimeDB 目前内置了以下几种转换类型:

  • int8, int16, int32, int64: 整数类型。
  • uint8, uint16, uint32, uint64: 无符号整数类型。
  • float32, float64: 浮点数类型。
  • string: 字符串类型。
  • time: 时间类型。将被转换为 GreptimeDB timestamp(9) 类型。
  • epoch: 时间戳类型。将被转换为 GreptimeDB timestamp(n) 类型。n 为时间戳精度,n 的值视 epoch 精度而定。当精度为 s 时,n 为 0;当精度为 ms 时,n 为 3;当精度为 us 时,n 为 6;当精度为 ns 时,n 为 9。

如果字段在转换过程中获得了非法值,Pipeline 将会抛出异常。例如将一个字符串 abc 转换为整数时,由于该字符串不是一个合法的整数,Pipeline 将会抛出异常。

index 字段

Pipeline 会将处理后的数据写入到 GreptimeDB 自动创建的数据表中。为了提高查询效率,GreptimeDB 会为表中的某些列创建索引。index 字段用于指定哪些字段需要被索引。关于 GreptimeDB 的列类型,请参考数据模型文档。

GreptimeDB 支持以下三种字段的索引类型:

  • tag: 用于指定某列为 Tag 列
  • fulltext: 用于指定某列使用 fulltext 类型的索引,该列需要是字符串类型
  • timestamp: 用于指定某列是时间索引列

不提供 index 字段时,GreptimeDB 会将该字段作为 Field 列。

在 GreptimeDB 中,一张表里必须包含一个 timestamp 类型的列作为该表的时间索引列,因此一个 Pipeline 有且只有一个时间索引列。

时间戳列

通过 index: timestamp 指定哪个字段是时间索引列,写法请参考下方的 Transform 示例

Tag 列

通过 index: tag 指定哪个字段是 Tag 列,写法请参考下方的 Transform 示例

Fulltext 列

通过 index: fulltext 指定哪个字段将会被用于全文搜索,该索引可大大提升 日志搜索 的性能,写法请参考下方的 Transform 示例

on_failure 字段

on_failure 字段用于指定转换失败时的处理方式,支持以下几种方式:

  • ignore: 忽略转换失败的字段,不写入数据库。
  • default: 写入默认值。默认值由 default 字段指定。

default 字段

default 字段用于指定转换失败时的默认值。

Transform 示例

例如,对于以下 log 数据:

  1. {
  2. "num_field_a": "3",
  3. "string_field_a": "john",
  4. "string_field_b": "It was snowing when he was born.",
  5. "time_field_a": 1625760000
  6. }

使用以下配置:

  1. transform:
  2. - fields:
  3. - string_field_a, name
  4. type: string
  5. index: tag
  6. - fields:
  7. - num_field_a, age
  8. type: int32
  9. - fields:
  10. - string_field_b, description
  11. type: string
  12. index: fulltext
  13. - fields:
  14. - time_field_a, bron_time
  15. type: epoch, s
  16. index: timestamp

将得到以下结果:

  1. {
  2. "name": "john",
  3. "age": 3,
  4. "description": "It was snowing when he was born.",
  5. "bron_time": 2021-07-08 16:00:00
  6. }