Beats output plugin

这是 elastic beats 的输出实现,支持 Filebeat, Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Heartbeat 到 Apache Doris。

该插件用于 beats 输出数据到 Doris,使用 HTTP 协议与 Doris FE Http 接口交互,并通过 Doris 的 stream load 的方式进行数据导入.

了解Doris Stream Load

了解更多关于Doris

兼容性

此插件是使用 Beats 7.3.1 开发和测试的

安装

下载源码

  1. mkdir -p $GOPATH/src/github.com/apache/
  2. cd $GOPATH/src/github.com/apache/
  3. git clone https://github.com/apache/doris
  4. cd doris/extension/beats

编译

在 extension/beats/ 目录下执行

  1. go build -o filebeat filebeat/filebeat.go
  2. go build -o metricbeat metricbeat/metricbeat.go
  3. go build -o winlogbeat winlogbeat/winlogbeat.go
  4. go build -o packetbeat packetbeat/packetbeat.go
  5. go build -o auditbeat auditbeat/auditbeat.go
  6. go build -o heartbeat heartbeat/heartbeat.go

您将在各个子目录目录下得到可执行文件

使用

您可以使用目录 [./example/] 中的示例配置文件,也可以按照以下步骤创建它。

配置 Beat

添加以下配置到 *beat.yml

  1. output.doris:
  2. fenodes: ["http://localhost:8030"] # your doris fe address
  3. user: root # your doris user
  4. password: root # your doris password
  5. database: example_db # your doris database
  6. table: example_table # your doris table
  7. codec_format_string: "%{[message]}" # beat-event format expression to row data
  8. headers:
  9. column_separator: ","

启动 Beat

使用 filebeat 作为示例

  1. ./filebeat/filebeat -c filebeat.yml -e

配置说明

连接 doris 配置:

NameDescriptionDefault
fenodesFE 的 HTTP交互地址。 例如: [“http://fe1:8030“, “http://fe2:8030“]
user用户名,该用户需要有 Doris 对应库表的导入权限
password密码
database数据库名
table表名
labelprefix导入标识前缀,最终生成的标识为 {label_prefix}{db}{table}{time_stamp}doris_beats
line_delimiter用于指定导入数据中的换行符,可以使用做多个字符的组合作为换行符。\n
headers用户可以通过 headers 传入 stream-load 导入参数

Beats 配置:

NameDescriptionDefault
codec_format_string设置格式化 beats 事件的表达式,格式化结果会作为行数据添加到 http body 中
codecbeats 输出编解码器,格式结果将作为一行添加到 http body 中,优先使用 codec_format_string
timeout设置 http 客户端超时时间
bulk_max_size批处理的最大事件数100000
max_retries发送失败时的最大重试次数,Filebeat 忽略 max_retries 设置并无限期重试。3
backoff.init网络错误后尝试重新连接之前等待的秒数1
backoff.max网络错误后尝试连接之前等待的最大秒数60

完整使用示例(Filebeat)

初始化 Doris

  1. CREATE DATABASE example_db;
  2. CREATE TABLE example_db.example_table (
  3. id BIGINT,
  4. name VARCHAR(100)
  5. )
  6. UNIQUE KEY(`id`)
  7. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  8. PROPERTIES (
  9. "replication_num"="1"
  10. );

配置 Filebeat

创建 /tmp/beats/filebeat.yml 文件并添加以下配置:

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. fenodes: ["http://localhost:8030"] # your doris fe address
  8. user: root # your doris user
  9. password: root # your doris password
  10. database: example_db # your doris database
  11. table: example_table # your doris table
  12. codec_format_string: "%{[message]}"
  13. headers:
  14. column_separator: ","

启动 Filebeat

  1. ./filebeat/filebeat -c /tmp/beats/filebeat.yml -e

验证数据导入

添加数据到文件 /tmp/beats/example.log

  1. echo -e "1,A\n2,B\n3,C\n4,D" >> /tmp/beats/example.log

观察 filebeat 日志,如果没有打印错误日志,则导入成功。 这时可以在 example_db.example_table 表中查看导入的数据

更多配置示例

指定导入的 columns

创建 /tmp/beats/example.log 文件并添加以下内容:

  1. 1,A
  2. 2,B

配置 columns

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]}"
  9. headers:
  10. columns: "id,name"

采集 json 文件

创建 /tmp/beats/example.json 文件并添加以下内容:

  1. {"id": 1, "name": "A"}
  2. {"id": 2, "name": "B"}

配置 headers

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.json
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]}"
  9. headers:
  10. format: json
  11. read_json_by_line: true

编码输出字段

创建 /tmp/beats/example.log 文件并添加以下内容:

  1. 1,A
  2. 2,B

配置 codec_format_string

  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /tmp/beats/example.log
  6. output.doris:
  7. ...
  8. codec_format_string: "%{[message]},%{[@timestamp]},%{[@metadata.type]}"
  9. headers:
  10. columns: "id,name,beat_timestamp,beat_metadata_type"

常见问题

如何配置批处理提交大小

添加以下内容到您的 *beat.yml 文件中

它表示,如果有 10000 个事件可用或最旧的可用事件已在内存队列中等待 5 秒,此示例配置会将事件批量转发给 doris:

  1. queue.mem:
  2. events: 10000
  3. flush.min_events: 10000
  4. flush.timeout: 5s

如何使用其他的 beats(例如 metricbeat)

Doris beats 支持所有的 beats 模块,使用方式参见 安装使用

如何构建 docker 镜像

可以使用 安装 输出的可执行文件打包 docker 镜像