拦截器

Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃Event,Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),Event就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃Event,在传递给下一级拦截器的list里面把它移除就行了。如果想丢弃所有的Event,返回一个空集合就行了。拦截器也是通过命名配置的组件,下面就是通过配置文件来创建拦截器的例子。

提示

Event在拦截器之间流动的时候是以集合的形式,并不是逐个Event传输的,这样就能理解上面所说的“从list里面移除”、“返回一个空集合”了。

做过Java web开发的同学应该很容易理解拦截器,Flume拦截器与spring MVC、struts2等框架里面的拦截器思路十分相似。

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.r1.interceptors = i1 i2
  5. a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
  6. a1.sources.r1.interceptors.i1.preserveExisting = false
  7. a1.sources.r1.interceptors.i1.hostHeader = hostname
  8. a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  9. a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
  10. a1.sinks.k1.channel = c1

拦截器构建器配置在type参数上。 拦截器是可配置的,就像其他可配置的组件一样。 在上面的示例中,Event首先传递给HostInterceptor,然后HostInterceptor返回的Event传递给TimestampInterceptor。 配置拦截器时你可以指定完全限定的类名(FQCN)或别名(timestamp)。如果你有多个收集器写入相同的HDFS路径下,那么HostInterceptor是很有用的。

时间戳添加拦截器

这个拦截器会向每个Event的header中添加一个时间戳属性进去,key默认是“timestamp ”(也可以通过下面表格中的header来自定义),value就是当前的毫秒值(其实就是用System.currentTimeMillis()方法得到的)。如果Event已经存在同名的属性,可以选择是否保留原始的值。


属性

默认值

解释

type



组件类型,这个是: timestamp

header

timestamp

向Event header中添加时间戳键值对的key

preserveExisting

false

是否保留Event header中已经存在的同名(上面header设置的key,默认是timestamp)时间戳

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.channels = c1
  4. a1.sources.r1.type = seq
  5. a1.sources.r1.interceptors = i1
  6. a1.sources.r1.interceptors.i1.type = timestamp

Host添加拦截器

这个拦截器会把当前Agent的hostname或者IP地址写入到Event的header中,key默认是“host”(也可以通过配置自定义key),value可以选择使用hostname或者IP地址。


属性

默认值

解释

type



组件类型,这个是: host

preserveExisting

false

如果header中已经存在同名的属性是否保留

useIP

true

true:使用IP地址;false:使用hostname

hostHeader

host

向Event header中添加host键值对的key

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.interceptors = i1
  4. a1.sources.r1.interceptors.i1.type = host

静态属性写入拦截器

静态拦截器可以向Event header中写入一个固定的键值对属性。

这个拦截器目前不支持写入多个属性,但是你可以通过配置多个静态属性写入拦截器来实现。


属性

默认值

解释

type



组件类型,这个是: static

preserveExisting

true

如果header中已经存在同名的属性是否保留

key

key

写入header的key

value

value

写入header的值

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.channels = c1
  4. a1.sources.r1.type = seq
  5. a1.sources.r1.interceptors = i1
  6. a1.sources.r1.interceptors.i1.type = static
  7. a1.sources.r1.interceptors.i1.key = datacenter
  8. a1.sources.r1.interceptors.i1.value = NEW_YORK

删除属性拦截器

这个拦截器可以删除Event header里面的属性,可以是一个或多个。支持删除固定的header、固定分隔符分隔的多个header列表,也支持用正则表达式匹配的方式匹配删除。如果这三种方式都没有配置,那么这个拦截器不会对Event做任何修改处理。

如果只有一个header要删除,尽量使用withName方式,它要比另外两种在性能上要好一些。


属性

默认值

解释

type



组件类型,这个是:removeheader

withName



要删除的header属性名

fromList



要删除的header名列表,用下面 _fromListSeparator 指定的分隔符分开

fromListSeparator

\s,\s

用来分隔 fromList 里面设置的header名的正则表达式,默认是由任意多个空白字符包围的逗号分隔

matching



要删除的header名的正则表达式,符合正则的将被全部删除

添加唯一ID拦截器

此拦截器在所有截获的Event上设置通用唯一标识符。 比如UUID可以是b5755073-77a9-43c1-8fad-b7a586f89757,它是一个128-bit的值。

Event如果没有可用的应用级唯一ID,就可以考虑使用添加唯一ID拦截器自动为Event分配UUID。 Event数据只要进入Flume网络中就给其分配一个UUID是非常重要的,Event进入Flume网络的第一个节点通常就是Flume的第一个source。这样可以在Flume网络中进行复制和重新传输以及Event的后续重复数据删除可以实现高可用性和高性能。 如果在应用层有唯一ID的话要比这种自动生成UUID要好一些,因为应用层分配的ID能方便我们在后续的数据存储中心对Event进行集中的更新和删除等操作。


属性

默认值

解释

type



组件类型,这个是:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

id

将要添加或者修改的id名称

preserveExisting

true

如果header中已经存在同名的属性是否保留

prefix

“”

UUID值的固定前缀(每个生成的uuid会在前面拼上这个固定前缀)

Morphline 实时清洗拦截器

此拦截器通过 morphline配置文件 过滤Event,配置文件定义了一系列转换命令,用于将记录从一个命令传递到另一个命令。 例如,morphline可以忽略某些Event或通过基于正则表达式的模式匹配来更改或插入某些Event header,或者它可以通过Apache Tika在截获的Event上自动检测和设置MIME类型。 例如,这种数据包嗅探可用于Flume拓扑中基于内容的动态路由。 Morphline 实时清洗拦截器还可以帮助实现到多个Apache Solr集合的动态路由(例如,用于multi-tenancy)。

目前存在一个限制,这个拦截器不能输入一个Event然后产生多个Event出来,它不适用于重型的ETL处理,如果有需要,请考虑将ETL操作从Flume source转移到Flume sink中,比如:MorphlineSolrSink

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder

morphlineFile



morphline配置文件在本地文件系统的绝对目录。比如:/etc/flume-ng/conf/morphline.conf

morphlineId

null

如果在morphline 配置文件里有多个morphline ,可以配置这个名字来加以区分

配置范例:

  1. a1.sources.avroSrc.interceptors = morphlineinterceptor
  2. a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
  3. a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
  4. a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

查找-替换拦截器

此拦截器基于Java正则表达式提供对Event消息体简单的基于字符串的搜索和替换功能。 还可以进行Backtracking / group。 此拦截器使用与Java Matcher.replaceAll()方法中的规则相同。


属性

默认值

解释

type



组件类型,这个是: search_replace

searchPattern



被替换的字符串的正则表达式

replaceString



上面正则找到的内容会使用这个字段进行替换

charset

UTF-8

Event body的字符编码,默认是:UTF-8

配置范例:

  1. a1.sources.avroSrc.interceptors = search-replace
  2. a1.sources.avroSrc.interceptors.search-replace.type = search_replace
  3.  
  4. # Remove leading alphanumeric characters in an event body.
  5. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
  6. a1.sources.avroSrc.interceptors.search-replace.replaceString =

再来一个例子:

  1. a1.sources.avroSrc.interceptors = search-replace
  2. a1.sources.avroSrc.interceptors.search-replace.type = search_replace
  3.  
  4. # Use grouping operators to reorder and munge words on a line.
  5. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
  6. a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

正则过滤拦截器

这个拦截器会把Event的body当做字符串来处理,并用配置的正则表达式来匹配。可以配置指定被匹配到的Event丢弃还是没被匹配到的Event丢弃。


属性

默认值

解释

type



组件类型,这个是: regex_filter

regex

“.*”

用于匹配Event内容的正则表达式

excludeEvents

false

如果为true,被正则匹配到的Event会被丢弃;如果为false,不被正则匹配到的Event会被丢弃

正则提取拦截器

这个拦截器会使用正则表达式从Event内容体中获取一组值并与配置的key组成n个键值对,然后放入Event的header中,Event的body不会有任何更改。它还支持插件化的方式配置序列化器来格式化从Event body中提取到的值。


属性

默认值

解释

type



组件类型,这个是: regexextractor

regex



用于匹配Event内容的正则表达式

serializers



被正则匹配到的一组值被逐个添加到header中所使用的key的名字列表,多个用空格分隔Flume提供了两个内置的序列化器,分别是:_org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer__org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

serializers.<s1>.type

default

可选值:1:default (default其实就是这个:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer);2:org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer;3:自定义序列化器的全限定类名(自定义序列化器需要实现 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口)

serializers.<s1>.name



指定即将放入header的key,也就是最终写入到header中键值对的key

serializers.*



序列化器的一些属性

序列化器是用来格式化匹配到的那些字符串后再与配置的key组装成键值对放入header,默认情况下你只需要制定这些key就行了,Flume默认会使用 org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer这个序列化器,这个序列化器只是简单地将提取到的字符串与配置的key映射组装起来。当然也可以配置一个自定义的序列化器,以任意你需要的格式来格式化这些值。

例子 1:

假设Event body中包含这个字符串“1:2:3.4foobar5”

  1. a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
  2. a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
  3. a1.sources.r1.interceptors.i1.serializers.s1.name = one
  4. a1.sources.r1.interceptors.i1.serializers.s2.name = two
  5. a1.sources.r1.interceptors.i1.serializers.s3.name = three

经过这个拦截器后,此时Event:

  1. body: 不变 header增加3个属性: one=>1, two=>2, three=3

将上面的例子变动一下

  1. a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
  2. a1.sources.r1.interceptors.i1.serializers = s1 s2
  3. a1.sources.r1.interceptors.i1.serializers.s1.name = one
  4. a1.sources.r1.interceptors.i1.serializers.s2.name = two

执行这个拦截器后,此时Event:

  1. body: 不变 header增加3个属性: one=>1, two=>2

例子 2:

假设Event body中的某些行包含2012-10-18 18:47:57,614格式的时间戳,运行下面的拦截器

  1. a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
  2. a1.sources.r1.interceptors.i1.serializers = s1
  3. a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
  4. a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
  5. a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

运行拦截器后,此时Event:

  1. body不变,header中增加一个新属性:timestamp=>1350611220000