Logstash Doris output plugin

Introduction

Logstash is a log ETL framework (collect, preprocess, send to storage systems) that supports custom output plugins to write data into storage systems. The Logstash Doris output plugin is a plugin for outputting data to Doris.

The Logstash Doris output plugin calls the Doris Stream Load HTTP interface to write data into Doris in real-time, offering capabilities such as multi-threaded concurrency, failure retries, custom Stream Load formats and parameters, and output write speed.

Using the Logstash Doris output plugin mainly involves three steps:

  1. Install the plugin into Logstash
  2. Configure the Doris output address and other parameters
  3. Start Logstash to write data into Doris in real-time

Installation

Obtaining the Plugin

You can download the plugin from the official website or compile it from the source code yourself.

  1. cd extension/logstash/
  2. gem build logstash-output-doris.gemspec

Installing the Plugin

  • Standard Installation

${LOGSTASH_HOME} is the installation directory of Logstash. Run the bin/logstash-plugin command under it to install the plugin.

  1. ${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.0.0.gem
  2. Validating logstash-output-doris-1.0.0.gem
  3. Installing logstash-output-doris
  4. Installation successful

The standard installation mode will automatically install the ruby modules that the plugin depends on. In cases where the network is not available, it will get stuck and cannot complete. In such cases, you can download the zip installation package with dependencies for a completely offline installation, noting that you need to use file:// to specify the local file system.

  • Offline Installation
  1. ${LOGSTASH_HOME}/bin/logstash-plugin install file:///tmp/logstash-output-doris-1.0.0.zip
  2. Installing file: logstash-output-doris-1.0.0.zip
  3. Resolving dependencies.........................
  4. Install successful

Configuration

The configuration for the Logstash Doris output plugin is as follows:

ConfigurationDescription
http_hostsStream Load HTTP address, formatted as a string array, can have one or more elements, each element is host:port. For example: [“http://fe1:8030“, “http://fe2:8030“]
userDoris username, this user needs to have import permissions for the corresponding Doris database and table
passwordPassword for the Doris user
dbThe Doris database name to write into
tableThe Doris table name to write into
label_prefixDoris Stream Load Label prefix, the final generated Label is {labelprefix}{db}{table}{yyyymmddhhmmss}{uuid}, the default value is logstash
headersDoris Stream Load headers parameter, the syntax format is a ruby map, for example: headers => { “format” => “json”, “read_json_by_line” => “true” }
mappingMapping from Logstash fields to Doris table fields, refer to the usage examples in the subsequent sections
message_onlyA special form of mapping, only outputs the Logstash @message field to Doris, default is false
max_retriesNumber of retries for Doris Stream Load requests on failure, default is -1 for infinite retries to ensure data reliability
log_requestWhether to output Doris Stream Load request and response metadata in logs for troubleshooting, default is false
log_speed_intervalTime interval for outputting speed in logs, unit is seconds, default is 10, setting to 0 can disable this type of logging

Usage Example

TEXT Log Collection Example

This example demonstrates TEXT log collection using Doris FE logs as an example.

1. Data

FE log files are typically located at the fe/log/fe.log file under the Doris installation directory. They are typical Java program logs, including fields such as timestamp, log level, thread name, code location, and log content. Not only do they contain normal logs, but also exception logs with stacktraces, which are multiline. Log collection and storage need to combine the main log and stacktrace into a single log entry.

  1. 2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
  2. 2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
  3. org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
  4. at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
  5. at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. Table Creation

The table structure includes fields such as the log’s creation time, collection time, hostname, log file path, log type, log level, thread name, code location, and log content.

  1. CREATE TABLE `doris_log` (
  2. `log_time` datetime NULL COMMENT 'log content time',
  3. `collect_time` datetime NULL COMMENT 'log agent collect time',
  4. `host` text NULL COMMENT 'hostname or ip',
  5. `path` text NULL COMMENT 'log file path',
  6. `type` text NULL COMMENT 'log type',
  7. `level` text NULL COMMENT 'log level',
  8. `thread` text NULL COMMENT 'log thread',
  9. `position` text NULL COMMENT 'log code position',
  10. `message` text NULL COMMENT 'log message',
  11. INDEX idx_host (`host`) USING INVERTED COMMENT '',
  12. INDEX idx_path (`path`) USING INVERTED COMMENT '',
  13. INDEX idx_type (`type`) USING INVERTED COMMENT '',
  14. INDEX idx_level (`level`) USING INVERTED COMMENT '',
  15. INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
  16. INDEX idx_position (`position`) USING INVERTED COMMENT '',
  17. INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
  18. ) ENGINE=OLAP
  19. DUPLICATE KEY(`log_time`)
  20. COMMENT 'OLAP'
  21. PARTITION BY RANGE(`log_time`) ()
  22. DISTRIBUTED BY RANDOM BUCKETS 10
  23. PROPERTIES (
  24. "replication_num" = "1",
  25. "dynamic_partition.enable" = "true",
  26. "dynamic_partition.time_unit" = "DAY",
  27. "dynamic_partition.start" = "-7",
  28. "dynamic_partition.end" = "1",
  29. "dynamic_partition.prefix" = "p",
  30. "dynamic_partition.buckets" = "10",
  31. "dynamic_partition.create_history_partition" = "true",
  32. "compaction_policy" = "time_series"
  33. );

3. Logstash Configuration

Logstash mainly has two types of configuration files: one for the entire Logstash system and another for a specific log collection.

The configuration file for the entire Logstash system is usually located at config/logstash.yml. To improve performance when writing to Doris, it is necessary to modify the batch size and batch delay. For logs with an average size of a few hundred bytes per line, a batch size of 1,000,000 lines and a batch delay of 10 seconds are recommended.

  1. pipeline.batch.size: 1000000
  2. pipeline.batch.delay: 10000

The configuration file for a specific log collection, such as logstash_doris_log.conf, mainly consists of three parts corresponding to the various stages of ETL:

  1. Input is responsible for reading the raw data.
  2. Filter is responsible for data transformation.
  3. Output is responsible for sending the data to the output destination.
  1. # 1. input is responsible for reading raw data
  2. # File input is an input plugin that can be configured to read the log file of the configured path. It uses the multiline codec to concatenate lines that do not start with a timestamp to the end of the previous line, achieving the effect of merging stacktraces with the main log. File input saves the log content in the @message field, and there are also some metadata fields such as host, log.file.path. Here, we manually add a field named type through add_field, with its value set to fe.log.
  3. input {
  4. file {
  5. path => "/mnt/disk2/xiaokang/opt/doris_master/fe/log/fe.log"
  6. add_field => {"type" => "fe.log"}
  7. codec => multiline {
  8. # valid line starts with timestamp
  9. pattern => "^%{TIMESTAMP_ISO8601} "
  10. # any line not starting with a timestamp should be merged with the previous line
  11. negate => true
  12. what => "previous"
  13. }
  14. }
  15. }
  16. # 2. filter section is responsible for data transformation
  17. # grok is a commonly used data transformation plugin that has some built-in patterns, such as TIMESTAMP_ISO8601 for parsing timestamps, and also supports writing regular expressions to extract fields.
  18. filter {
  19. grok {
  20. match => {
  21. # parse log_time, level, thread, position fields from message
  22. "message" => "%{TIMESTAMP_ISO8601:log_time} (?<level>[A-Z]+) \((?<thread>[^\[]*)\) \[(?<position>[^\]]*)\]"
  23. }
  24. }
  25. }
  26. # 3. output section is responsible for data output
  27. # Doris output sends data to Doris using the Stream Load HTTP interface. The data format for Stream Load is specified as JSON through the headers parameter, and the mapping parameter specifies the mapping from Logstash fields to JSON fields. Since headers specify "format" => "json", Stream Load will automatically parse the JSON fields and write them into the corresponding fields of the Doris table.
  28. output {
  29. doris {
  30. http_hosts => ["http://localhost:8630"]
  31. user => "root"
  32. password => ""
  33. db => "log_db"
  34. table => "doris_log"
  35. headers => {
  36. "format" => "json"
  37. "read_json_by_line" => "true"
  38. "load_to_single_tablet" => "true"
  39. }
  40. mapping => {
  41. "log_time" => "%{log_time}"
  42. "collect_time" => "%{@timestamp}"
  43. "host" => "%{[host][name]}"
  44. "path" => "%{[log][file][path]}"
  45. "type" => "%{type}"
  46. "level" => "%{level}"
  47. "thread" => "%{thread}"
  48. "position" => "%{position}"
  49. "message" => "%{message}"
  50. }
  51. log_request => true
  52. }
  53. }

4. Running Logstash

  1. ${LOGSTASH_HOME}/bin/logstash -f config/logstash_doris_log.conf
  2. # When log_request is set to true, the log will output the request parameters and response results of each Stream Load.
  3. [2024-07-08T22:35:34,772][INFO ][logstash.outputs.doris ][main][e44d2a24f17d764647ce56f5fed24b9bbf08d3020c7fddcc3298800daface80a] doris stream load response:
  4. {
  5. "TxnId": 45464,
  6. "Label": "logstash_log_db_doris_log_20240708_223532_539_6c20a0d1-dcab-4b8e-9bc0-76b46a929bd1",
  7. "Comment": "",
  8. "TwoPhaseCommit": "false",
  9. "Status": "Success",
  10. "Message": "OK",
  11. "NumberTotalRows": 452,
  12. "NumberLoadedRows": 452,
  13. "NumberFilteredRows": 0,
  14. "NumberUnselectedRows": 0,
  15. "LoadBytes": 277230,
  16. "LoadTimeMs": 1797,
  17. "BeginTxnTimeMs": 0,
  18. "StreamLoadPutTimeMs": 18,
  19. "ReadDataTimeMs": 9,
  20. "WriteDataTimeMs": 1758,
  21. "CommitAndPublishTimeMs": 18
  22. }
  23. # By default, speed information is logged every 10 seconds, including the amount of data since startup (in MB and ROWS), the total speed (in MB/s and R/s), and the speed in the last 10 seconds.
  24. [2024-07-08T22:35:38,285][INFO ][logstash.outputs.doris ][main] total 11 MB 18978 ROWS, total speed 0 MB/s 632 R/s, last 10 seconds speed 1 MB/s 1897 R/s

JSON Log Collection Example

This example demonstrates JSON log collection using data from the GitHub events archive.

1. Data

The GitHub events archive contains archived data of GitHub user actions, formatted as JSON. It can be downloaded from here, for example, the data for January 1, 2024, at 3 PM.

  1. wget https://data.gharchive.org/2024-01-01-15.json.gz

Below is a sample of the data. Normally, each piece of data is on a single line, but for ease of display, it has been formatted here.

  1. {
  2. "id": "37066529221",
  3. "type": "PushEvent",
  4. "actor": {
  5. "id": 46139131,
  6. "login": "Bard89",
  7. "display_login": "Bard89",
  8. "gravatar_id": "",
  9. "url": "https://api.github.com/users/Bard89",
  10. "avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
  11. },
  12. "repo": {
  13. "id": 780125623,
  14. "name": "Bard89/talk-to-me",
  15. "url": "https://api.github.com/repos/Bard89/talk-to-me"
  16. },
  17. "payload": {
  18. "repository_id": 780125623,
  19. "push_id": 17799451992,
  20. "size": 1,
  21. "distinct_size": 1,
  22. "ref": "refs/heads/add_mvcs",
  23. "head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
  24. "before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
  25. },
  26. "public": true,
  27. "created_at": "2024-04-01T23:00:00Z"
  28. }

2. Table Creation

  1. CREATE DATABASE log_db;
  2. USE log_db;
  3. CREATE TABLE github_events
  4. (
  5. `created_at` DATETIME,
  6. `id` BIGINT,
  7. `type` TEXT,
  8. `public` BOOLEAN,
  9. `actor.id` BIGINT,
  10. `actor.login` TEXT,
  11. `actor.display_login` TEXT,
  12. `actor.gravatar_id` TEXT,
  13. `actor.url` TEXT,
  14. `actor.avatar_url` TEXT,
  15. `repo.id` BIGINT,
  16. `repo.name` TEXT,
  17. `repo.url` TEXT,
  18. `payload` TEXT,
  19. `host` TEXT,
  20. `path` TEXT,
  21. INDEX `idx_id` (`id`) USING INVERTED,
  22. INDEX `idx_type` (`type`) USING INVERTED,
  23. INDEX `idx_actor.id` (`actor.id`) USING INVERTED,
  24. INDEX `idx_actor.login` (`actor.login`) USING INVERTED,
  25. INDEX `idx_repo.id` (`repo.id`) USING INVERTED,
  26. INDEX `idx_repo.name` (`repo.name`) USING INVERTED,
  27. INDEX `idx_host` (`host`) USING INVERTED,
  28. INDEX `idx_path` (`path`) USING INVERTED,
  29. INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
  30. )
  31. ENGINE = OLAP
  32. DUPLICATE KEY(`created_at`)
  33. PARTITION BY RANGE(`created_at`) ()
  34. DISTRIBUTED BY RANDOM BUCKETS 10
  35. PROPERTIES (
  36. "replication_num" = "1",
  37. "compaction_policy" = "time_series",
  38. "enable_single_replica_compaction" = "true",
  39. "dynamic_partition.enable" = "true",
  40. "dynamic_partition.create_history_partition" = "true",
  41. "dynamic_partition.time_unit" = "DAY",
  42. "dynamic_partition.start" = "-30",
  43. "dynamic_partition.end" = "1",
  44. "dynamic_partition.prefix" = "p",
  45. "dynamic_partition.buckets" = "10",
  46. "dynamic_partition.replication_num" = "1"
  47. );

3. Logstash Configuration

The configuration file differs from the previous TEXT log collection in the following aspects:

  1. The codec parameter for file input is json. Logstash will parse each line of text as JSON format and use the parsed fields for subsequent processing.
  2. No filter plugin is used because no additional processing or transformation is needed.
  1. input {
  2. file {
  3. path => "/tmp/github_events/2024-04-01-23.json"
  4. codec => json
  5. }
  6. }
  7. output {
  8. doris {
  9. http_hosts => ["http://fe1:8630", "http://fe2:8630", "http://fe3:8630"]
  10. user => "root"
  11. password => ""
  12. db => "log_db"
  13. table => "github_events"
  14. headers => {
  15. "format" => "json"
  16. "read_json_by_line" => "true"
  17. "load_to_single_tablet" => "true"
  18. }
  19. mapping => {
  20. "created_at" => "%{created_at}"
  21. "id" => "%{id}"
  22. "type" => "%{type}"
  23. "public" => "%{public}"
  24. "actor.id" => "%{[actor][id]}"
  25. "actor.login" => "%{[actor][login]}"
  26. "actor.display_login" => "%{[actor][display_login]}"
  27. "actor.gravatar_id" => "%{[actor][gravatar_id]}"
  28. "actor.url" => "%{[actor][url]}"
  29. "actor.avatar_url" => "%{[actor][avatar_url]}"
  30. "repo.id" => "%{[repo][id]}"
  31. "repo.name" => "%{[repo][name]}"
  32. "repo.url" => "%{[repo][url]}"
  33. "payload" => "%{[payload]}"
  34. "host" => "%{[host][name]}"
  35. "path" => "%{[log][file][path]}"
  36. }
  37. log_request => true
  38. }
  39. }

4. Running Logstash

  1. ${LOGSTASH_HOME}/bin/logstash -f logstash_github_events.conf