Doris output plugin

该插件用于logstash输出数据到Doris,使用 HTTP 协议与 Doris FE Http接口交互,并通过 Doris 的 stream load 的方式进行数据导入.

了解Doris Stream Load

了解更多关于Doris

安装和编译

1.下载插件源码

2.编译

在extension/logstash/ 目录下执行

gem build logstash-output-doris.gemspec

你将在同目录下得到 logstash-output-doris-{version}.gem 文件

3.插件安装

copy logstash-output-doris-{version}.gem 到 logstash 安装目录下

执行命令

./bin/logstash-plugin install logstash-output-doris-{version}.gem

安装 logstash-output-doris 插件

配置

示例:

在config目录下新建一个配置配置文件,命名为 logstash-doris.conf

具体配置如下:

  1. output {
  2. doris {
  3. http_hosts => [ "http://fehost:8030" ]
  4. user => user_name
  5. password => password
  6. db => "db_name"
  7. table => "table_name"
  8. label_prefix => "label_prefix"
  9. column_separator => ","
  10. }
  11. }

配置说明:

连接相关配置:

配置说明
httphostsFE的HTTP交互地址。 例如: [“http://fe1:8030“, “http://fe2:8030“]
user用户名,该用户需要有doris对应库表的导入权限
password密码
db数据库名
table表名
label_prefix导入标识前缀,最终生成的标识为 {label_prefix}{db}{table}{time_stamp}

导入相关配置:(参考文档

配置说明
column_separator列分割符,默认为\t。
columns用于指定导入文件中的列和 table 中的列的对应关系。
where导入任务指定的过滤条件。
max_filter_ratio导入任务的最大容忍率,默认零容忍。
partition待导入表的 Partition 信息。
timeout超时时间,默认为600s。
strict_mode严格模式,默认为false。
timezone指定本次导入所使用的时区,默认为东八区。
exec_mem_limit导入内存限制,默认为 2GB,单位为字节。
format指定导入数据格式,支持csv、json、 csv_with_names、csv_with_names_and_types、parquet、orc,默认是csv。
jsonpaths匹配模式需要通过jsonpaths参数匹配对应的value。
json_root用于指定json document的根节点,默认值为””
fuzzy_parse布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高 json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json 格式。
num_as_string为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。

其他配置

配置说明
save_on_failure如果导入失败是否在本地保存,默认为true
save_dir本地保存目录,默认为 /tmp
automatic_retries失败时重试最大次数,默认为3
batch_size每批次最多处理的event数量,默认为100000
idle_flush_time最大间隔时间,默认为20(秒)

启动

执行命令启动doris output plugin:

{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic

完整使用示例

1.编译doris-output-plugin

1> 下载ruby压缩包,自行到ruby官网下载,这里使用的2.7.1版本

2> 编译安装,配置ruby的环境变量

3> 到doris源码 extension/logstash/ 目录下,执行

gem build logstash-output-doris.gemspec

得到文件 logstash-output-doris-0.1.0.gem,至此编译完成

2.安装配置filebeat(此处使用filebeat作为input)

1> es官网下载 filebeat tar压缩包并解压

2> 进入filebeat目录下,修改配置文件 filebeat.yml 如下:

  1. filebeat.inputs:
  2. - type: log
  3. paths:
  4. - /tmp/doris.data
  5. output.logstash:
  6. hosts: ["localhost:5044"]

/tmp/doris.data 为doris数据路径

3> 启动filebeat:

./filebeat -e -c filebeat.yml -d "publish"

3.安装logstash及doris-out-plugin

1> es官网下载 logstash tar压缩包并解压

2> 将步骤1中得到的 logstash-output-doris-0.1.0.gem copy到logstash安装目录下

3> 执行

./bin/logstash-plugin install logstash-output-doris-0.1.0.gem

安装插件

4> 在config 目录下新建配置文件 logstash-doris.conf 内容如下:

  1. input {
  2. beats {
  3. port => "5044"
  4. }
  5. }
  6. output {
  7. doris {
  8. http_hosts => [ "http://127.0.0.1:8030" ]
  9. user => doris
  10. password => doris
  11. db => "logstash_output_test"
  12. table => "output"
  13. label_prefix => "doris"
  14. column_separator => ","
  15. columns => "a,b,c,d,e"
  16. }
  17. }

这里的配置需按照配置说明自行配置

5> 启动logstash:

./bin/logstash -f ./config/logstash-doris.conf —config.reload.automatic

4.测试功能

向/tmp/doris.data追加写入数据

echo a,b,c,d,e >> /tmp/doris.data

观察logstash日志,若返回response的Status为 Success,则导入成功,此时可在 logstash_output_test.output 表中查看已导入的数据