Logstash Doris output plugin


Logstash 是一个日志ETL框架(采集,预处理,发送到存储系统),它支持自定义输出插件将数据写入存储系统,Logstash Doris output plugin 是输出到 Doris 的插件。

Logstash Doris output plugin 调用 Doris Stream Load HTTP 接口将数据实时写入 Doris,提供多线程并发,失败重试,自定义 Stream Load 格式和参数,输出写入速度等能力。

使用 Logstash Doris output plugin 主要有三个步骤:

  1. 将插件安装到 Logstash 中
  2. 配置 Doris 输出地址和其他参数
  3. 启动 Logstash 将数据实时写入 Doris



可以从官网下载或者自行从源码编译 Logstash Doris output plugin。

  1. cd extension/logstash/
  2. gem build logstash-output-doris.gemspec


  • 普通安装

${LOGSTASH_HOME} 是 Logstash 的安装目录,运行它下面的 bin/logstash-plugin 命令安装插件

  1. ${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem
  2. Validating logstash-output-doris-1.0.0.gem
  3. Installing logstash-output-doris
  4. Installation successful

普通安装模式会自动安装插件依赖的 ruby 模块,对于网络不通的情况会卡住无法完成,这种情况下可以下载包含依赖的zip安装包进行完全离线安装,注意需要用 file:// 指定本地文件系统。

  • 离线安装
  1. ${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip
  2. Installing file: logstash-output-doris-1.0.0.zip
  3. Resolving dependencies.........................
  4. Install successful


Logstash Doris output plugin 的配置如下:

http_hostsStream Load HTTP 地址,格式是字符串数组,可以有一个或者多个元素,每个元素是 host:port。 例如:[“http://fe1:8030“, “http://fe2:8030“]
userDoris 用户名,该用户需要有doris对应库表的导入权限
passwordDoris 用户的密码
db要写入的 Doris 库名
table要写入的 Doris 表名
label_prefixDoris Stream Load Label 前缀,最终生成的 Label 为 {labelprefix}{db}{table}{yyyymmddhhmmss}{uuid} ,默认值是 logstash
headersDoris Stream Load 的 headers 参数,语法格式为 ruby map,例如:headers => { “format” => “json” “read_json_by_line” => “true” }
mappingLogstash 字段到 Doris 表字段的映射, 参考后续章节的使用示例
message_only一种特殊的 mapping 形式,只将 Logstash 的 @message 字段输出到 Doris,默认为 false
max_retriesDoris Stream Load 请求失败重试次数,默认为 -1 无限重试保证数据可靠性
log_request日志中是否输出 Doris Stream Load 请求和响应元数据,用于排查问题,默认为 false
log_speed_interval日志中输出速度的时间间隔,单位是秒,默认为 10,设置为 0 可以关闭这种日志


TEXT 日志采集示例

该示例以 Doris FE 的日志为例展示 TEXT 日志采集。

1. 数据

FE 日志文件一般位于 Doris 安装目录下的 fe/log/fe.log 文件,是典型的 Java 程序日志,包括时间戳,日志级别,线程名,代码位置,日志内容等字段。不仅有正常的日志,还有带 stacktrace 的异常日志,stacktrace 是跨行的,日志采集存储需要把主日志和 stacktrace 组合成一条日志。

  1. 2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
  2. 2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
  3. org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
  4. at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
  5. at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. 建表


  1. CREATE TABLE `doris_log` (
  2. `log_time` datetime NULL COMMENT 'log content time',
  3. `collect_time` datetime NULL COMMENT 'log agent collect time',
  4. `host` text NULL COMMENT 'hostname or ip',
  5. `path` text NULL COMMENT 'log file path',
  6. `type` text NULL COMMENT 'log type',
  7. `level` text NULL COMMENT 'log level',
  8. `thread` text NULL COMMENT 'log thread',
  9. `position` text NULL COMMENT 'log code position',
  10. `message` text NULL COMMENT 'log message',
  11. INDEX idx_host (`host`) USING INVERTED COMMENT '',
  12. INDEX idx_path (`path`) USING INVERTED COMMENT '',
  13. INDEX idx_type (`type`) USING INVERTED COMMENT '',
  14. INDEX idx_level (`level`) USING INVERTED COMMENT '',
  15. INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
  16. INDEX idx_position (`position`) USING INVERTED COMMENT '',
  17. INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
  19. DUPLICATE KEY(`log_time`)
  21. PARTITION BY RANGE(`log_time`) ()
  24. "replication_num" = "1",
  25. "dynamic_partition.enable" = "true",
  26. "dynamic_partition.time_unit" = "DAY",
  27. "dynamic_partition.start" = "-7",
  28. "dynamic_partition.end" = "1",
  29. "dynamic_partition.prefix" = "p",
  30. "dynamic_partition.buckets" = "10",
  31. "dynamic_partition.create_history_partition" = "true",
  32. "compaction_policy" = "time_series"
  33. );

3. Logstash 配置

Logstash 主要有两类配置文件,一类是整个 Logstash 的配置文件,另一类是某个日志采集的配置文件。

整个 Logstash 的配置文件通常在 config/logstash.yml,为了提升写入 Doris 的性能需要修改 batch 大小和攒批时间,对于平均每条i几百字节的日志,推荐 100 万行和 10s 。

  1. pipeline.batch.size: 1000000
  2. pipeline.batch.delay: 10000

某个日志采集的配置文件如 logstash_doris_log.conf 主要由 3 部分组成,分别对应 ETL 的各个部分:

  1. input 负责读取原始数据
  2. filter 负责做数据转换
  3. output 负责将数据输出
  1. # 1. input 负责读取原始数据
  2. # file input 是一个 input plugin,可以配置读取的日志文件路径,通过 multiline codec 将非时间开头的行拼接到上一行后面,实现 stacktrace 和主日志合并的效果。file input 会将日志内容保存在 @message 字段中,另外还有一些元数据字段比如 host,log.file.path,这里我们还通过 add_field 手动添加了一个字段 type,它的值为 fe.log 。
  3. input {
  4. file {
  5. path => "/mnt/disk2/xiaokang/opt/doris_master/fe/log/fe.log"
  6. add_field => {"type" => "fe.log"}
  7. codec => multiline {
  8. # valid line starts with timestamp
  9. pattern => "^%{TIMESTAMP_ISO8601} "
  10. # any line not starting with a timestamp should be merged with the previous line
  11. negate => true
  12. what => "previous"
  13. }
  14. }
  15. }
  16. # 2. filter 部分负责数据转换
  17. # grok 是一个常用的数据转换插件,它内置了一些常见的pattern 比如 TIMESTAMP_ISO8601 解析时间戳,还支持写正则表达式提取字段。
  18. filter {
  19. grok {
  20. match => {
  21. # parse log_time, level, thread, position fields from message
  22. "message" => "%{TIMESTAMP_ISO8601:log_time} (?<level>[A-Z]+) \((?<thread>[^\[]*)\) \[(?<position>[^\]]*)\]"
  23. }
  24. }
  25. }
  26. # 3. output 部分负责数据输出
  27. # doris output 将数据输出到 Doris,使用的是 Stream Load HTTP 接口。通过 headers 参数指定了 Stream Load 的数据格式为 JSON,通过 mapping 参数指定 Logstash 字段到 JSON 字段的映射。由于 headers 指定了 "format" => "json",Stream Load 会自动解析 JSON 字段写入对应的 Doris 表的字段。
  28. output {
  29. doris {
  30. http_hosts => ["http://localhost:8630"]
  31. user => "root"
  32. password => ""
  33. db => "log_db"
  34. table => "doris_log"
  35. headers => {
  36. "format" => "json"
  37. "read_json_by_line" => "true"
  38. "load_to_single_tablet" => "true"
  39. }
  40. mapping => {
  41. "log_time" => "%{log_time}"
  42. "collect_time" => "%{@timestamp}"
  43. "host" => "%{[host][name]}"
  44. "path" => "%{[log][file][path]}"
  45. "type" => "%{type}"
  46. "level" => "%{level}"
  47. "thread" => "%{thread}"
  48. "position" => "%{position}"
  49. "message" => "%{message}"
  50. }
  51. log_request => true
  52. }
  53. }

4. 运行 Logstash

  1. ${LOGSTASH_HOME}/bin/logstash -f config/logstash_doris_log.conf
  2. # log_request 为 true 时日志会输出每次 Stream Load 的请求参数和响应结果
  3. [2024-07-08T22:35:34,772][INFO ][logstash.outputs.doris ][main][e44d2a24f17d764647ce56f5fed24b9bbf08d3020c7fddcc3298800daface80a] doris stream load response:
  4. {
  5. "TxnId": 45464,
  6. "Label": "logstash_log_db_doris_log_20240708_223532_539_6c20a0d1-dcab-4b8e-9bc0-76b46a929bd1",
  7. "Comment": "",
  8. "TwoPhaseCommit": "false",
  9. "Status": "Success",
  10. "Message": "OK",
  11. "NumberTotalRows": 452,
  12. "NumberLoadedRows": 452,
  13. "NumberFilteredRows": 0,
  14. "NumberUnselectedRows": 0,
  15. "LoadBytes": 277230,
  16. "LoadTimeMs": 1797,
  17. "BeginTxnTimeMs": 0,
  18. "StreamLoadPutTimeMs": 18,
  19. "ReadDataTimeMs": 9,
  20. "WriteDataTimeMs": 1758,
  21. "CommitAndPublishTimeMs": 18
  22. }
  23. # 默认每隔 10s 会日志输出速度信息,包括自启动以来的数据量(MB 和 ROWS),总速度(MB/s 和 R/S),最近 10s 速度
  24. [2024-07-08T22:35:38,285][INFO ][logstash.outputs.doris ][main] total 11 MB 18978 ROWS, total speed 0 MB/s 632 R/s, last 10 seconds speed 1 MB/s 1897 R/s

JSON 日志采集示例

该样例以 github events archive 的数据为例展示 JSON 日志采集。

1. 数据

github events archive 是 github 用户操作事件的归档数据,格式是 JSON,可以从 https://www.gharchive.org/ 下载,比如下载 2024年1月1日15点的数据。

  1. wget https://data.gharchive.org/2024-01-01-15.json.gz


  1. {
  2. "id": "37066529221",
  3. "type": "PushEvent",
  4. "actor": {
  5. "id": 46139131,
  6. "login": "Bard89",
  7. "display_login": "Bard89",
  8. "gravatar_id": "",
  9. "url": "https://api.github.com/users/Bard89",
  10. "avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
  11. },
  12. "repo": {
  13. "id": 780125623,
  14. "name": "Bard89/talk-to-me",
  15. "url": "https://api.github.com/repos/Bard89/talk-to-me"
  16. },
  17. "payload": {
  18. "repository_id": 780125623,
  19. "push_id": 17799451992,
  20. "size": 1,
  21. "distinct_size": 1,
  22. "ref": "refs/heads/add_mvcs",
  23. "head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
  24. "before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
  25. },
  26. "public": true,
  27. "created_at": "2024-04-01T23:00:00Z"
  28. }

2. Doris 建表

  1. CREATE DATABASE log_db;
  2. USE log_db;
  3. CREATE TABLE github_events
  4. (
  5. `created_at` DATETIME,
  6. `id` BIGINT,
  7. `type` TEXT,
  8. `public` BOOLEAN,
  9. `actor.id` BIGINT,
  10. `actor.login` TEXT,
  11. `actor.display_login` TEXT,
  12. `actor.gravatar_id` TEXT,
  13. `actor.url` TEXT,
  14. `actor.avatar_url` TEXT,
  15. `repo.id` BIGINT,
  16. `repo.name` TEXT,
  17. `repo.url` TEXT,
  18. `payload` TEXT,
  19. `host` TEXT,
  20. `path` TEXT,
  21. INDEX `idx_id` (`id`) USING INVERTED,
  22. INDEX `idx_type` (`type`) USING INVERTED,
  23. INDEX `idx_actor.id` (`actor.id`) USING INVERTED,
  24. INDEX `idx_actor.login` (`actor.login`) USING INVERTED,
  25. INDEX `idx_repo.id` (`repo.id`) USING INVERTED,
  26. INDEX `idx_repo.name` (`repo.name`) USING INVERTED,
  27. INDEX `idx_host` (`host`) USING INVERTED,
  28. INDEX `idx_path` (`path`) USING INVERTED,
  29. INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
  30. )
  32. DUPLICATE KEY(`created_at`)
  33. PARTITION BY RANGE(`created_at`) ()
  36. "replication_num" = "1",
  37. "compaction_policy" = "time_series",
  38. "enable_single_replica_compaction" = "true",
  39. "dynamic_partition.enable" = "true",
  40. "dynamic_partition.create_history_partition" = "true",
  41. "dynamic_partition.time_unit" = "DAY",
  42. "dynamic_partition.start" = "-30",
  43. "dynamic_partition.end" = "1",
  44. "dynamic_partition.prefix" = "p",
  45. "dynamic_partition.buckets" = "10",
  46. "dynamic_partition.replication_num" = "1"
  47. );

3. Logstash 配置

这个配置文件和之前 TEXT 日志采集不同的有下面几点:

  1. file input 的 codec 参数是 json,Logstash 会将每一行文本当作 JSON 格式解析,解析出来的字段用于后续处理
  2. 没有用 filter plugin,因为不需要额外的处理转换
  1. input {
  2. file {
  3. path => "/tmp/github_events/2024-04-01-23.json"
  4. codec => json
  5. }
  6. }
  7. output {
  8. doris {
  9. http_hosts => ["http://fe1:8630", "http://fe2:8630", "http://fe3:8630"]
  10. user => "root"
  11. password => ""
  12. db => "log_db"
  13. table => "github_events"
  14. headers => {
  15. "format" => "json"
  16. "read_json_by_line" => "true"
  17. "load_to_single_tablet" => "true"
  18. }
  19. mapping => {
  20. "created_at" => "%{created_at}"
  21. "id" => "%{id}"
  22. "type" => "%{type}"
  23. "public" => "%{public}"
  24. "actor.id" => "%{[actor][id]}"
  25. "actor.login" => "%{[actor][login]}"
  26. "actor.display_login" => "%{[actor][display_login]}"
  27. "actor.gravatar_id" => "%{[actor][gravatar_id]}"
  28. "actor.url" => "%{[actor][url]}"
  29. "actor.avatar_url" => "%{[actor][avatar_url]}"
  30. "repo.id" => "%{[repo][id]}"
  31. "repo.name" => "%{[repo][name]}"
  32. "repo.url" => "%{[repo][url]}"
  33. "payload" => "%{[payload]}"
  34. "host" => "%{[host][name]}"
  35. "path" => "%{[log][file][path]}"
  36. }
  37. log_request => true
  38. }
  39. }

4. 运行 Logstash

  1. ${LOGSTASH_HOME}/bin/logstash -f logstash_github_events.conf