日志格式转换

建议先了解Loggie内部日志数据schema设计

日志格式转换,主要是对Loggie内置的一些字段进行处理和转换,一般会依赖日志切分处理

需求场景

Loggie部署在不同的环境中,如果需要兼容已有的格式,可以参考如下的办法。

示例1: 发送原始数据

正常情况,Loggie仅将原始数据发送给下游。

如果你配置:

pipelines.yml

  1. pipelines:
  2. - name: local
  3. sources:
  4. - type: file
  5. name: demo
  6. paths:
  7. - /tmp/log/*.log
  8. sink:
  9. type: dev
  10. printEvents: true
  11. codec:
  12. pretty: true

sink输出的数据仅为原始数据:

  1. {
  2. "body": "01-Dec-2021 03:13:58.298 INFO [main] Starting service [Catalina]"
  3. }

示例2: 添加fields自定义元信息

如果我们在source上配置了一些自定义的fields。

pipelines.yml

  1. pipelines:
  2. - name: local
  3. sources:
  4. - type: file
  5. name: demo
  6. paths:
  7. - /tmp/log/*.log
  8. fields:
  9. topic: "loggie"
  10. sink:
  11. type: dev
  12. printEvents: true
  13. codec:
  14. pretty: true

那么sink输出的为:

  1. {
  2. "fields": {
  3. "topic": "loggie",
  4. },
  5. "body": "01-Dec-2021 03:13:58.298 INFO [main] Starting service [Catalina]"
  6. }

当然我们也可以配置fieldsUnderRoot: true,让fields里的key:value和body同一层级。

  1. fields:
  2. topic: "loggie"
  3. fieldsUnderRoot: true
  1. {
  2. "topic": "loggie",
  3. "body": "01-Dec-2021 03:13:58.298 INFO [main] Starting service [Catalina]"
  4. }

示例3: 添加meta系统内置元信息

有一些Loggie系统内置的元信息,我们也希望发送给下游,这个时候,需要使用normalize interceptor中的addMeta processors。

pipelines.yml

  1. pipelines:
  2. - name: local
  3. sources:
  4. - type: file
  5. name: demo
  6. paths:
  7. - /tmp/log/*.log
  8. fields:
  9. topic: "loggie"
  10. interceptors:
  11. - type: normalize
  12. processors:
  13. - addMeta: ~
  14. sink:
  15. type: dev
  16. printEvents: true
  17. codec:
  18. pretty: true

配置了addMeta processor之后,默认会把所有的系统内置元信息输出。

默认Json格式输出示例如下:

Example

  1. {
  2. "fields": {
  3. "topic": "loggie"
  4. },
  5. "meta": {
  6. "systemState": {
  7. "nextOffset": 720,
  8. "filename": "/tmp/log/a.log",
  9. "collectTime": "2022-03-08T11:33:47.369813+08:00",
  10. "contentBytes": 90,
  11. "jobUid": "43772050-16777231",
  12. "lineNumber": 8,
  13. "offset": 630
  14. },
  15. "systemProductTime": "2022-03-08T11:33:47.370166+08:00",
  16. "systemPipelineName": "local",
  17. "systemSourceName": "demo"
  18. },
  19. "body": "01-Dec-2021 03:13:58.298 INFO [main] Starting service [Catalina]"
  20. }

当然,我们可能会觉得这些数据太多了,或者想对字段进行修改。我们就可以使用normalize interceptor里的其他processor,进行drop、rename、copy、timestamp转换等操作。

另外,这里的normalize interceptor配置在某个Pipeline中,如果希望全局生效,避免每个pipeline都配置该interceptor,可以在defaults中配置:

loggie.yml

  1. defaults:
  2. interceptors:
  3. - type: normalize
  4. name: global # 用于区分pipelines中的normalize,避免pipeline中定义了normalize会覆盖这里的defaults
  5. order: 500 # 默认normalize的order值为900,这里定义一个相对较小值,可控制先执行defaults中的normalize
  6. processor:
  7. - addMeta: ~

示例4:beatsFormat快速转换

如果你期望的格式为类似:

  1. {
  2. "@timestamp": "2021-12-12T11:58:12.141267",
  3. "message": "01-Dec-2021 03:13:58.298 INFO [main] Starting service [Catalina]",
  4. }

Loggie提供了一个内置的转换方式,可以在sink codec中设置beatsFormat如下:

Config

  1. sink:
  2. type: dev
  3. printEvents: true
  4. codec:
  5. type: json
  6. beatsFormat: true

会将默认的body改为message,同时增加@timestamp时间字段。