拦截器
Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃Event,Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),Event就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃Event,在传递给下一级拦截器的list里面把它移除就行了。如果想丢弃所有的Event,返回一个空集合就行了。拦截器也是通过命名配置的组件,下面就是通过配置文件来创建拦截器的例子。
提示
Event在拦截器之间流动的时候是以集合的形式,并不是逐个Event传输的,这样就能理解上面所说的“从list里面移除”、“返回一个空集合”了。
做过Java web开发的同学应该很容易理解拦截器,Flume拦截器与spring MVC、struts2等框架里面的拦截器思路十分相似。
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- a1.sources.r1.interceptors = i1 i2
- a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
- a1.sources.r1.interceptors.i1.preserveExisting = false
- a1.sources.r1.interceptors.i1.hostHeader = hostname
- a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
- a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
- a1.sinks.k1.channel = c1
拦截器构建器配置在type参数上。 拦截器是可配置的,就像其他可配置的组件一样。 在上面的示例中,Event首先传递给HostInterceptor,然后HostInterceptor返回的Event传递给TimestampInterceptor。 配置拦截器时你可以指定完全限定的类名(FQCN)或别名(timestamp)。如果你有多个收集器写入相同的HDFS路径下,那么HostInterceptor是很有用的。
时间戳添加拦截器
这个拦截器会向每个Event的header中添加一个时间戳属性进去,key默认是“timestamp ”(也可以通过下面表格中的header来自定义),value就是当前的毫秒值(其实就是用System.currentTimeMillis()方法得到的)。如果Event已经存在同名的属性,可以选择是否保留原始的值。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: timestamp |
header | timestamp | 向Event header中添加时间戳键值对的key |
preserveExisting | false | 是否保留Event header中已经存在的同名(上面header设置的key,默认是timestamp)时间戳 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.channels = c1
- a1.sources.r1.type = seq
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = timestamp
Host添加拦截器
这个拦截器会把当前Agent的hostname或者IP地址写入到Event的header中,key默认是“host”(也可以通过配置自定义key),value可以选择使用hostname或者IP地址。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: host |
preserveExisting | false | 如果header中已经存在同名的属性是否保留 |
useIP | true | true:使用IP地址;false:使用hostname |
hostHeader | host | 向Event header中添加host键值对的key |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = host
静态属性写入拦截器
静态拦截器可以向Event header中写入一个固定的键值对属性。
这个拦截器目前不支持写入多个属性,但是你可以通过配置多个静态属性写入拦截器来实现。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: static |
preserveExisting | true | 如果header中已经存在同名的属性是否保留 |
key | key | 写入header的key |
value | value | 写入header的值 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.channels = c1
- a1.sources.r1.type = seq
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = static
- a1.sources.r1.interceptors.i1.key = datacenter
- a1.sources.r1.interceptors.i1.value = NEW_YORK
删除属性拦截器
这个拦截器可以删除Event header里面的属性,可以是一个或多个。支持删除固定的header、固定分隔符分隔的多个header列表,也支持用正则表达式匹配的方式匹配删除。如果这三种方式都没有配置,那么这个拦截器不会对Event做任何修改处理。
如果只有一个header要删除,尽量使用withName方式,它要比另外两种在性能上要好一些。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: removeheader |
withName | – | 要删除的header属性名 |
fromList | – | 要删除的header名列表,用下面 _fromListSeparator 指定的分隔符分开 |
fromListSeparator | \s,\s | 用来分隔 fromList 里面设置的header名的正则表达式,默认是由任意多个空白字符包围的逗号分隔 |
matching | – | 要删除的header名的正则表达式,符合正则的将被全部删除 |
添加唯一ID拦截器
此拦截器在所有截获的Event上设置通用唯一标识符。 比如UUID可以是b5755073-77a9-43c1-8fad-b7a586f89757,它是一个128-bit的值。
Event如果没有可用的应用级唯一ID,就可以考虑使用添加唯一ID拦截器自动为Event分配UUID。 Event数据只要进入Flume网络中就给其分配一个UUID是非常重要的,Event进入Flume网络的第一个节点通常就是Flume的第一个source。这样可以在Flume网络中进行复制和重新传输以及Event的后续重复数据删除可以实现高可用性和高性能。 如果在应用层有唯一ID的话要比这种自动生成UUID要好一些,因为应用层分配的ID能方便我们在后续的数据存储中心对Event进行集中的更新和删除等操作。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
headerName | id | 将要添加或者修改的id名称 |
preserveExisting | true | 如果header中已经存在同名的属性是否保留 |
prefix | “” | UUID值的固定前缀(每个生成的uuid会在前面拼上这个固定前缀) |
Morphline 实时清洗拦截器
此拦截器通过 morphline配置文件 过滤Event,配置文件定义了一系列转换命令,用于将记录从一个命令传递到另一个命令。 例如,morphline可以忽略某些Event或通过基于正则表达式的模式匹配来更改或插入某些Event header,或者它可以通过Apache Tika在截获的Event上自动检测和设置MIME类型。 例如,这种数据包嗅探可用于Flume拓扑中基于内容的动态路由。 Morphline 实时清洗拦截器还可以帮助实现到多个Apache Solr集合的动态路由(例如,用于multi-tenancy)。
目前存在一个限制,这个拦截器不能输入一个Event然后产生多个Event出来,它不适用于重型的ETL处理,如果有需要,请考虑将ETL操作从Flume source转移到Flume sink中,比如:MorphlineSolrSink 。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder |
morphlineFile | – | morphline配置文件在本地文件系统的绝对目录。比如:/etc/flume-ng/conf/morphline.conf |
morphlineId | null | 如果在morphline 配置文件里有多个morphline ,可以配置这个名字来加以区分 |
配置范例:
- a1.sources.avroSrc.interceptors = morphlineinterceptor
- a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
- a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
- a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1
查找-替换拦截器
此拦截器基于Java正则表达式提供对Event消息体简单的基于字符串的搜索和替换功能。 还可以进行Backtracking / group。 此拦截器使用与Java Matcher.replaceAll()方法中的规则相同。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: search_replace |
searchPattern | – | 被替换的字符串的正则表达式 |
replaceString | – | 上面正则找到的内容会使用这个字段进行替换 |
charset | UTF-8 | Event body的字符编码,默认是:UTF-8 |
配置范例:
- a1.sources.avroSrc.interceptors = search-replace
- a1.sources.avroSrc.interceptors.search-replace.type = search_replace
- # Remove leading alphanumeric characters in an event body.
- a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
- a1.sources.avroSrc.interceptors.search-replace.replaceString =
再来一个例子:
- a1.sources.avroSrc.interceptors = search-replace
- a1.sources.avroSrc.interceptors.search-replace.type = search_replace
- # Use grouping operators to reorder and munge words on a line.
- a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
- a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1
正则过滤拦截器
这个拦截器会把Event的body当做字符串来处理,并用配置的正则表达式来匹配。可以配置指定被匹配到的Event丢弃还是没被匹配到的Event丢弃。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: regex_filter |
regex | “.*” | 用于匹配Event内容的正则表达式 |
excludeEvents | false | 如果为true,被正则匹配到的Event会被丢弃;如果为false,不被正则匹配到的Event会被丢弃 |
正则提取拦截器
这个拦截器会使用正则表达式从Event内容体中获取一组值并与配置的key组成n个键值对,然后放入Event的header中,Event的body不会有任何更改。它还支持插件化的方式配置序列化器来格式化从Event body中提取到的值。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: regexextractor |
regex | – | 用于匹配Event内容的正则表达式 |
serializers | – | 被正则匹配到的一组值被逐个添加到header中所使用的key的名字列表,多个用空格分隔Flume提供了两个内置的序列化器,分别是:_org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer__org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer |
serializers.<s1>.type | default | 可选值:1: default (default其实就是这个:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer);2:org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer ;3:自定义序列化器的全限定类名(自定义序列化器需要实现 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口) |
serializers.<s1>.name | – | 指定即将放入header的key,也就是最终写入到header中键值对的key |
serializers.* | – | 序列化器的一些属性 |
序列化器是用来格式化匹配到的那些字符串后再与配置的key组装成键值对放入header,默认情况下你只需要制定这些key就行了,Flume默认会使用 org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
这个序列化器,这个序列化器只是简单地将提取到的字符串与配置的key映射组装起来。当然也可以配置一个自定义的序列化器,以任意你需要的格式来格式化这些值。
例子 1:
假设Event body中包含这个字符串“1:2:3.4foobar5”
- a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
- a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
- a1.sources.r1.interceptors.i1.serializers.s1.name = one
- a1.sources.r1.interceptors.i1.serializers.s2.name = two
- a1.sources.r1.interceptors.i1.serializers.s3.name = three
经过这个拦截器后,此时Event:
- body: 不变 header增加3个属性: one=>1, two=>2, three=3
将上面的例子变动一下
- a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
- a1.sources.r1.interceptors.i1.serializers = s1 s2
- a1.sources.r1.interceptors.i1.serializers.s1.name = one
- a1.sources.r1.interceptors.i1.serializers.s2.name = two
执行这个拦截器后,此时Event:
- body: 不变 header增加3个属性: one=>1, two=>2
例子 2:
假设Event body中的某些行包含2012-10-18 18:47:57,614格式的时间戳,运行下面的拦截器
- a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
- a1.sources.r1.interceptors.i1.serializers = s1
- a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
- a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
- a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
运行拦截器后,此时Event:
- body不变,header中增加一个新属性:timestamp=>1350611220000