Flume Sources

Avro Source

Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的 Avro Sink 配合使用就组成了一个分层的拓扑结构。必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: avro

bind



监听的服务器名hostname或者ip

port



监听的端口

threads



生成的最大工作线程数量

selector.type

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器的相关属性

compression-type

none

可选值: nonedeflate 。这个类型必须跟Avro Source相匹配

ssl

false

设置为 true 可启用SSL加密,如果为true必须指定下面的 keystorekeystore-password

keystore



SSL加密使用的Java keystore文件路径

keystore-password



Java keystore的密码

keystore-type

JKS

Java keystore的类型. 可选值有 JKSPKCS12

exclude-protocols

SSLv3

指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除

ipFilter

false

设置为true可启用ip过滤(netty方式的avro)

ipFilterRules



netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子)

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = avro
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.bind = 0.0.0.0
  6. a1.sources.r1.port = 4141

ipFilterRules格式详解

ipFilterRules 可以配置一些允许或者禁止的ip规则,它的配置格式是:allow/deny:ip/name:pattern

第一部分只能是[allow]或[deny]两个词其中一个,第二部分是[ip]或[name]的其中一个,第三部分是正则,每个部分中间用“:”分隔。

比如可以配置成下面这样:

  1. ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

注意,最先匹配到的规则会优先生效,看下面关于localhost的两个配置的不同

  1. #只允许localhost的客户端连接,禁止其他所有的连接
  2. ipFilterRules=allow:name:localhost,deny:ip:
  3.  
  4. #允许除了localhost以外的任意的客户端连接
  5. ipFilterRules=deny:name:localhost,allow:ip:

Thrift Source

监听Thrift 端口,从外部的Thrift客户端接收数据流。如果从上一层的Flume Agent的 Thrift Sink 串联后就创建了一个多层级的Flume架构(同 Avro Source 一样,只不过是协议不同而已)。Thrift Source可以通过配置让它以安全模式(kerberos authentication)运行,具体的配置看下表。必需的参数已用 粗体 标明。

提示

同Avro Source十分类似,不同的是支持了 kerberos 认证。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: thrift

bind



监听的 hostname 或 IP 地址

port



监听的端口

threads



生成的最大工作线程数量

selector.type

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器的相关属性

ssl

false

设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password。

keystore



SSL加密使用的Java keystore文件路径

keystore-password



Java keystore的密码

keystore-type

JKS

Java keystore的类型. 可选值有 JKSPKCS12

exclude-protocols

SSLv3

排除支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除

kerberos

false

设置为 true ,开启kerberos 身份验证。在kerberos模式下,成功进行身份验证需要 agent-principalagent-keytab 。安全模式下的Thrift仅接受来自已启用kerberos且已成功通过kerberos KDC验证的Thrift客户端的连接。

agent-principal



指定Thrift Source使用的kerberos主体用于从kerberos KDC进行身份验证。

agent-keytab

—-

Thrift Source与Agent主体结合使用的keytab文件位置,用于对kerberos KDC进行身份验证。

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = thrift
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.bind = 0.0.0.0
  6. a1.sources.r1.port = 4141

Exec Source

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true )。 如果进程因任何原因退出,则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。

提示

cat [named pipe]和tail -F [file]都能持续地输出内容,那些不能持续输出内容的命令不可以。这里注意一下cat命令后面接的参数是命名管道(named pipe)不是文件。

必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: exec

command



所使用的系统命令,一般是cat 或者tail

shell



设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。

restartThrottle

10000

尝试重新启动之前等待的时间(毫秒)

restart

false

如果执行命令线程挂掉,是否重启

logStdErr

false

是否会记录命令的stderr内容

batchSize

20

读取并向channel发送数据时单次发送的最大数量

batchTimeout

3000

向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配置

警告

ExecSource相比于其他异步source的问题在于,如果无法将Event放入Channel中,ExecSource无法保证客户端知道它。在这种情况下数据会丢失。例如,最常见的用法是用tail -F [file]这种,应用程序负责向磁盘写入日志文件,Flume 会用tail命令从日志文件尾部读取,将每行作为一个Event发送。这里有一个明显的问题:如果channel满了然后无法继续发送Event,会发生什么?由于种种原因,Flume无法向输出日志文件的应用程序指示它需要保留日志或某些Event尚未发送。总之你需要知道:当使用ExecSource等单向异步接口时,您的应用程序永远无法保证数据已经被成功接收!作为此警告的延伸,此source传递Event时没有交付保证。为了获得更强的可靠性保证,请考虑使用 Spooling Directory SourceTaildir Source 或通过SDK直接与Flume集成。

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = exec
  4. a1.sources.r1.command = tail -F /var/log/secure
  5. a1.sources.r1.channels = c1

shell 属性是用来配置执行命令的shell(比如Bash或者Powershell)。command 会作为参数传递给 shell 执行,这使得command可以使用shell中的特性,例如通配符、后退标记、管道、循环、条件等。如果没有 shell 配置,将直接调用 command 配置的命令。shell 通常配置的值有:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等。

  1. a1.sources.tailsource-1.type = exec
  2. a1.sources.tailsource-1.shell = /bin/bash -c
  3. a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source是一个可以从JMS的队列或者topic中读取消息的组件。按理说JMS Source作为一个JMS的应用应该是能够与任意的JMS消息队列无缝衔接工作的,可事实上目前仅在ActiveMQ上做了测试。JMS Source支持配置batch size、message selector、user/pass和Event数据的转换器(converter)。注意所使用的JMS队列的jar包需要在Flume实例的classpath中,建议放在专门的插件目录plugins.d下面,或者启动时候用-classpath指定,或者编辑flume-env.sh文件的FLUME_CLASSPATH来设置。

必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: jms

initialContextFactory



初始上下文工厂类,比如: org.apache.activemq.jndi.ActiveMQInitialContextFactory

connectionFactory



连接工厂应显示为的JNDI名称

providerURL



JMS 的连接URL

destinationName



目的地名称

destinationType



目的地类型, queuetopic

messageSelector



创建消费者时使用的消息选择器

userName



连接JMS队列时的用户名

passwordFile



连接JMS队列时的密码文件,注意是文件名不是密码的明文

batchSize

100

消费JMS消息时单次发送的Event数量

converter.type

DEFAULT

用来转换JMS消息为Event的转换器类,参考下面参数。

converter.*



转换器相关的属性

converter.charset

UTF-8

转换器把JMS的文本消息转换为byte arrays时候使用的编码,默认转换器的专属参数

createDurableSubscription

false

是否创建持久化订阅。 持久化订阅只能在 destinationType = topic 时使用。 如果为 true ,则必须配置 clientIddurableSubscriptionName

clientId



连接创建后立即给JMS客户端设置标识符。持久化订阅必配参数。

durableSubscriptionName



用于标识持久订阅的名称。持久化订阅必配参数。
关于转换器

JMS source可以配置插入式的转换器,尽管默认的转换器已经足够应付大多数场景了,默认的转换器可以把字节、文本、对象消息转换为Event。不管哪种类型消息中的属性都会作为headers被添加到Event中。

字节消息:JMS消息中的字节会被拷贝到Event的body中,注意转换器处理的单个消息大小不能超过2GB。

文本消息:JMS消息中的文本会被转为byte array拷贝到Event的body中。默认的编码是UTF-8,可自行配置编码。

对象消息:对象消息会被写出到封装在ObjectOutputStream中的ByteArrayOutputStream里面,得到的array被复制到Event的body。

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = jms
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
  6. a1.sources.r1.connectionFactory = GenericConnectionFactory
  7. a1.sources.r1.providerURL = tcp://mqserver:61616
  8. a1.sources.r1.destinationName = BUSINESS_DATA
  9. a1.sources.r1.destinationType = QUEUE

Spooling Directory Source

这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  • 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。

  • 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。

为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。

尽管有这个Source的可靠性保证,但是仍然存在这样的情况,某些下游故障发生时会出现重复Event的情况。这与其他Flume组件提供的保证是一致的。


属性名

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: spooldir.

spoolDir



Flume Source监控的文件夹目录,该目录下的文件会被Flume收集

fileSuffix

.COMPLETED

被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED

deletePolicy

never

是否删除已完成收集的文件,可选值: neverimmediate

fileHeader

false

是否添加文件的绝对路径名(绝对路径+文件名)到header中。

fileHeaderKey

file

添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用)

basenameHeader

false

是否添加文件名(只是文件名,不包括路径)到header 中

basenameHeaderKey

basename

添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)

includePattern

^.$

指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高

ignorePattern

^$

指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePatternincludePattern 两个正则都匹配到,这个文件会被忽略。

trackerDir

.flumespool

用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建

consumeOrder

oldest

设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldestyoungestrandom 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集

pollDelay

500

Flume监视目录内新文件产生的时间间隔,单位:毫秒

recursiveDirectorySearch

false

是否收集子目录下的日志文件

maxBackoff

4000

等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。

batchSize

100

每次批量传输到channel时的size大小

inputCharset

UTF-8

解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)

decodeErrorPolicy

FAIL

当从文件读取时遇到不可解析的字符时如何处理。FAIL :抛出异常,解析文件失败;REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD;IGNORE :忽略无法解析的字符。

deserializer

LINE

指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口

deserializer.

解析器的相关属性,根据解析器不同而不同

bufferMaxLines



(已废弃)

bufferMaxLineLength

5000

(已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配置

配置范例:

  1. a1.channels = ch-1
  2. a1.sources = src-1
  3.  
  4. a1.sources.src-1.type = spooldir
  5. a1.sources.src-1.channels = ch-1
  6. a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
  7. a1.sources.src-1.fileHeader = true
Event反序列化器

下面是Flume内置的一些反序列化工具

LINE

这个反序列化器会把文本数据的每行解析成一个Event


属性

默认值

解释

deserializer.maxLineLength

2048

每个Event数据所包含的最大字符数,如果一行文本字符数超过这个配置就会被截断,剩下的字符会出现再后面的Event数据里

deserializer.outputCharset

UTF-8

解析Event所使用的编码

提示

deserializer.maxLineLength 的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符数可能超过2048,超出的部分会被截断,千万记得根据自己的日志长度调大这个值。

AVRO

这个反序列化器能够读取avro容器文件,并在文件中为每个Avro记录生成一个Event。每个Event都会在header中记录它的模式。Event的body是二进制的avro记录内容,不包括模式和容器文件元素的其余部分。

注意如果Spooling Directory Source发生了重新把一个Event放入channel的情况(比如,通道已满导致重试),则它将重置并从最新的Avro容器文件同步点重试。 为了减少此类情况下的潜在Event重复,请在Avro输入文件中更频繁地写入同步标记。


属性名

默认值

解释

deserializer.schemaType

HASH

如何表示模式。 默认或者指定为 HASH 时,会对Avro模式进行哈希处理,并将哈希值存储在Event header中以“flume.avro.schema.hash”这个key。如果指定为 LITERAL ,则会以JSON格式的模式存储在Event header中以“flume.avro.schema.literal”这个key。 与HASH模式相比,使用LITERAL模式效率相对较低。
BlobDeserializer

这个反序列化器可以反序列化一些大的二进制文件,一个文件解析成一个Event,例如pdf或者jpg文件等。注意这个解析器不太适合解析太大的文件,因为被反序列化的操作是在内存里面进行的


属性

默认值

解释

deserializer



这个解析器没有别名缩写,需要填类的全限定名: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder

deserializer.maxBlobLength

100000000

每次请求的最大读取和缓冲的字节数,默认这个值大概是95.36MB

Taildir Source

注解

Taildir Source目前只是个预览版本,还不能运行在windows系统上。

Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。

Taildir Source是可靠的,即使发生文件轮换(译者注1)也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。

Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。

文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。

Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。

提示

译者注1:文件轮换 (file rotate)是英文直译。通常系统会自动丢弃日志文件中时间久远的日志,一般按照日志文件大小或时间来自动分割或丢弃的机制。参考来源:Log rotation


属性名

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: TAILDIR.

filegroups



被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔

filegroups.<filegroupName>



被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名

positionFile

~/.flume/taildirposition.json

用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。

headers.<filegroupName>.<headerKey>



给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对。

byteOffsetHeader

false

是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset

skipToEnd

false

如果在 _positionFile 里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取

idleTimeout

120000

关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开

writePosInterval

3000

positionFile 记录文件的读取位置的间隔时间(毫秒)

batchSize

100

一次读取数据行和写入channel的最大数量,通常使用默认值就很好

backoffSleepIncrement

1000

在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒)

maxBackoffSleep

5000

每次重新尝试轮询新数据时的最大时间延迟(毫秒)

cachePatternMatching

true

对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。 缓存匹配文件列表可以提高性能。消耗文件的顺序也将被缓存。 要求文件系统支持以至少秒级跟踪修改时间。

fileHeader

false

是否在header里面存储文件的绝对路径

fileHeaderKey

file

文件的绝对路径存储到header里面使用的key

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = TAILDIR
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
  6. a1.sources.r1.filegroups = f1 f2
  7. a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
  8. a1.sources.r1.headers.f1.headerKey1 = value1
  9. a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
  10. a1.sources.r1.headers.f2.headerKey1 = value2
  11. a1.sources.r1.headers.f2.headerKey2 = value2-2
  12. a1.sources.r1.fileHeader = true

Twitter 1% firehose Source (实验性的)

警告

这个source 纯粹是实验性的,之后的版本可能会有改动,使用中任何风险请自行承担。

提示

从Google上搜了一下twitter firehose到底是什么东西,找到了这个 What is Twitter firehose and who can use it?,类似于Twitter提供的实时的消息流服务的API,只有少数的一些合作商公司才能使用,对于我们普通的使用者来说没有任何意义。本节可以跳过不用看了。

这个Source通过流API连接到1%的样本twitter信息流并下载这些tweet,将它们转换为Avro格式,并将Avro Event发送到下游Flume。使用者需要有Twitter开发者账号、访问令牌和秘钥。必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: org.apache.flume.source.twitter.TwitterSource

consumerKey



OAuth consumer key

consumerSecret



OAuth consumer secret

accessToken



OAuth access token

accessTokenSecret



OAuth token secret

maxBatchSize

1000

每次获取twitter数据的数据集大小,简单说就是一次取多少

maxBatchDurationMillis

1000

每次批量获取数据的最大等待时间(毫秒)

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
  6. a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
  7. a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
  8. a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
  9. a1.sources.r1.maxBatchSize = 10
  10. a1.sources.r1.maxBatchDurationMillis = 200

Kafka Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。


属性名

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers



Source使用的Kafka集群实例列表

kafka.consumer.group.id

flume

消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组

kafka.topics



将要读取消息的目标 Kafka topic 列表,多个用逗号分隔

kafka.topics.regex



会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。

batchSize

1000

一批写入 channel 的最大消息数

batchDurationMillis

1000

一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。

backoffSleepIncrement

1000

当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

maxBackoffSleep

5000

Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

useFlumeEventFormat

false

默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。

setTopicHeader

true

当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。

topicHeader

topic

如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。

migrateZookeeperOffsets

true

如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。

kafka.consumer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入Kafka。可选值:SASLPLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。

_more consumer security props

如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置

Other Kafka Consumer Properties



其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

注解

Kafka Source 覆盖了两个Kafka 消费者的参数:auto.commit.enable 这个参数被设置成了false,Kafka Source 会提交每一个批处理。Kafka Source 保证至少一次消息恢复策略。Source 启动时可以存在重复项。Kafka Source 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。

已经弃用的一些属性:


属性名

默认值

解释

topic



改用 kafka.topics

groupId

flume

改用 kafka.consumer.group.id

zookeeperConnect



自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

通过逗号分隔的 topic 列表进行 topic 订阅的示例:

  1. tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. tier1.sources.source1.channels = channel1
  3. tier1.sources.source1.batchSize = 5000
  4. tier1.sources.source1.batchDurationMillis = 2000
  5. tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
  6. tier1.sources.source1.kafka.topics = test1, test2
  7. tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则表达式 topic 订阅的示例:

  1. tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. tier1.sources.source1.channels = channel1
  3. tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
  4. tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
  5. # the default kafka.consumer.group.id=flume is used

安全与加密:Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。

截至目前,数据加密仅由SSL / TLS提供。

当你把 kafka.consumer.security.protocol 设置下面任何一个值的时候意味着:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证

  • SASL_SSL - 有数据加密的 Kerberos 或明文认证

  • SSL - 基于TLS的加密,可选的身份验证。

警告

启用SSL时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overviewKAFKA-2561

使用TLS:

请阅读 Configuring Kafka Clients SSL SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。

服务端认证和数据加密的一个配置实例:

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SSL
  6. a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
  7. a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:

  1. a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。

  1. a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
  2. a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则 ssl.key.password 属性将为消费者密钥库提供所需的额外密码:

  1. a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos安全配置:

要将Kafka Source 与使用Kerberos保护的Kafka群集一起使用,请为消费者设置上面提到的consumer.security.protocol 属性。 与Kafka实例一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。 “客户端”部分描述了Zookeeper连接信息(如果需要)。有关JAAS文件内容的信息,请参阅 Kafka doc 。 可以通过flume-env.sh中的JAVA_OPTS指定此JAAS文件的位置以及系统范围的 kerberos 配置:

  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
  6. a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
  7. a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的安全配置范例:

  1. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  2. a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.sources.source1.kafka.topics = mytopic
  4. a1.sources.source1.kafka.consumer.group.id = flume-consumer
  5. a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
  6. a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
  7. a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
  8. a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
  9. a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka Source 也可以连接 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不必要这样做,或者您需要此部分用于其他安全组件。 另外,请确保Flume进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };
  8.  
  9. KafkaClient {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. storeKey=true
  13. keyTab="/path/to/keytabs/flume.keytab"
  14. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  15. };

NetCat TCP Source

这个source十分像nc -k -l [host] [port]这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中。

提示

常见的系统日志都是逐行输出的,Flume的各种Source接收数据也基本上以行为单位进行解析和处理。不论是 NetCat TCP Source ,还是其他的读取文本类型的Source比如:Spooling Directory SourceTaildir SourceExec Source 等也都是一样的。

必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: netcat

bind



要监听的 hostname 或者IP地址

port



监听的端口

max-line-length

512

每行解析成Event 消息体的最大字节数

ack-every-event

true

对收到的每一行数据用“OK”做出响应

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配置

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = netcat
  4. a1.sources.r1.bind = 0.0.0.0
  5. a1.sources.r1.port = 6666
  6. a1.sources.r1.channels = c1

NetCat UDP Source

看名字也看得出,跟 NetCat TCP Source 是一对亲兄弟,区别是监听的协议不同。这个source就像是 nc -u -k -l [host] [port]命令一样,监听一个端口然后接收来自于这个端口上UDP协议发送过来的文本内容,逐行转换为Event发送到channel。

必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是:netcatudp

bind



要监听的 hostname 或者IP地址

port



监听的端口

remoteAddressHeader



UDP消息源地址(或IP)被解析到Event的header里面时所使用的key名称

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = netcatudp
  4. a1.sources.r1.bind = 0.0.0.0
  5. a1.sources.r1.port = 6666
  6. a1.sources.r1.channels = c1

Sequence Generator Source

这个Source是一个序列式的Event生成器,从它启动就开始生成,总共会生成totalEvents个。它并不是一个日志收集器,它通常是用来测试用的。它在发送失败的时候会重新发送失败的Event到channel,保证最终发送到channel的唯一Event数量一定是 totalEvents 个。必需的参数已用 粗体 标明。

提示

记住Flume的设计原则之一就是传输过程的『可靠性』,上面说的失败重试以及最终的数量问题,这是毫无疑问的。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是:seq

selector.type

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

replicating

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

batchSize

1

每次请求向channel发送的 Event 数量

totalEvents

Long.MAX_VALUE

这个Source会发出的Event总数,这些Event是唯一的

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = seq
  4. a1.sources.r1.channels = c1

Syslog Sources

这个Source是从syslog读取日志并解析为 Event,同样也分为TCP协议和UDP协议的,TCP协议的Source会按行(\n)来解析成 Event,UDP协议的Souce会把一个消息体解析为一个 Event。

Syslog TCP Source

提示

这个Syslog TCP Source在源码里面已经被@deprecated了,推荐使用 Multiport Syslog TCP Source 来代替。

必需的参数已用 粗体 标明。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: syslogtcp

host



要监听的hostname或者IP地址

port



要监听的端口

eventSize

2500

每行数据的最大字节数

keepFields

none

是否保留syslog消息头中的一些属性到Event中,可选值 allnone 或自定义指定保留的字段。如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 truefalse,建议改用 allnone 了。

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = syslogtcp
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.host = localhost
  6. a1.sources.r1.channels = c1
Multiport Syslog TCP Source

这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是:multiportsyslogtcp

host



要监听的hostname或者IP地址

ports



一个或多个要监听的端口,多个用空格分开

eventSize

2500

解析成Event的每行数据的最大字节数

keepFields

none

是否保留syslog消息头中的一些属性到Event中,可选值 allnone 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 truefalse ,建议改用 allnone 了。

portHeader



如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。

charset.default

UTF-8

解析syslog使用的默认编码

charset.port.<port>



针对具体某一个端口配置编码

batchSize

100

每次请求尝试处理的最大Event数量,通常用这个默认值就很好。

readBufferSize

1024

内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。

numProcessors

(自动分配)

处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.*



channel选择器的相关属性,具体属性根据设定的 _selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.*

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = multiport_syslogtcp
  4. a1.sources.r1.channels = c1
  5. a1.sources.r1.host = 0.0.0.0
  6. a1.sources.r1.ports = 10001 10002 10003
  7. a1.sources.r1.portHeader = port
Syslog UDP Source

属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: syslogudp

host



要监听的hostname或者IP地址

port



要监听的端口

keepFields

false

设置为true后,解析syslog时会保留Priority, Timestamp and Hostname这些属性到Event的消息体中(查看源码发现,实际上保留了priority、version、timestamp、hostname这四个字段在消息体的前面)

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = syslogudp
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.host = localhost
  6. a1.sources.r1.channels = c1

HTTP Source

这个Source从HTTP POST 和 GET请求里面解析 Event,GET方式目前还只是实验性的。把HTTP请求解析成Event是通过配置一个“handler”来实现的,这个“handler”必须实现 HTTPSourceHandler 接口,这个接口其实就一个方法,收到一个HttpServletRequest后解析出一个 Event 的List。从一次请求解析出来的若干个Event会以一个事务提交到channel,从而在诸如『文件channel』的一些channel上提高效率。如果handler抛出异常,这个HTTP的响应状态码是400。如果channel满了或者无法发送Event到channel,此时会返回HTTP状态码503(服务暂时不可用)。

在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type

组件类型,这个是: http

port



要监听的端口

bind

0.0.0.0

要监听的hostname或者IP地址

handler

org.apache.flume.source.http.JSONHandler

所使用的handler,需填写handler的全限定类名

handler.



handler的一些属性配置

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.*

拦截器相关的属性配

enableSSL

false

设置为true启用SSL,HTTP Source不支持SSLv3协议

excludeProtocols

SSLv3

指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除

keystore

keystore 文件的位置

keystorePassword

Keystore 的密码

提示

Flume里面很多组件都明确表示强制不支持SSLv3协议,是因为SSLv3协议的不安全,各大公司很早就表示不再支持了。

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = http
  4. a1.sources.r1.port = 5140
  5. a1.sources.r1.channels = c1
  6. a1.sources.r1.handler = org.example.rest.RestHandler
  7. a1.sources.r1.handler.nickname = random props
JSONHandler

这是HTTP Source的默认解析器(handler),根据请求所使用的编码把http请求中json格式的数据解析成Flume Event数组(不管是一个还是多个,都以数组格式进行存储),如果未指定编码,默认使用UTF-8编码。这个handler支持UTF-8、UTF-16和UTF-32编码。json数据格式如下:

  1. [{
  2. "headers" : {
  3. "timestamp" : "434324343",
  4. "host" : "random_host.example.com"
  5. },
  6. "body" : "random_body"
  7. },
  8. {
  9. "headers" : {
  10. "namenode" : "namenode.example.com",
  11. "datanode" : "random_datanode.example.com"
  12. },
  13. "body" : "really_random_body"
  14. }]

HTTP请求中设置编码必须是通过Content type来设置,application/json; charset=UTF-8(UTF-8 可以换成UTF-16 或者 UTF-32)。

一种创建这个handler使用的json格式对象 org.apache.flume.event.JSONEvent 的方法是使用Google Gson 库的Gson#fromJson(Object, Type) 方法创建json格式字符串,这个方法的第二个参数就是类型标记,用于指定Event列表的类型,像下面这样创建:

  1. Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

默认情况下HTTPSource会把json处理成Event。作为一个补充的选项BlobHandler 不仅支持返回请求中的参数也包含其中的二进制数据,比如PDF文件、jpg文件等。这种可以接收附件的处理器不适合处理非常大的文件,因为这些文件都是缓冲在内存里面的。


属性

默认值

解释

handler



这里填BlobHandler的全限定类名: org.apache.flume.sink.solr.morphline.BlobHandler

handler.maxBlobLength

100000000

每次请求的最大缓冲字节数

Stress Source

StressSource 是一个内部负载生成Source的实现, 对于压力测试非常有用 。可以配置每个Event的大小(headers为空)、也可以配置总共发送Event数量以及发送成功的Event最大数量。

提示

它跟 Sequence Generator Source 差不多,都是用来测试用的。

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.source.StressSource

size

500

每个Event的大小。单位:字节(byte)

maxTotalEvents

-1

总共会发送的Event数量

maxSuccessfulEvents

-1

发送成功的Event最大数量

batchSize

1

每次请求发送Event的数量

配置范例:

  1. a1.sources = stresssource-1
  2. a1.channels = memoryChannel-1
  3. a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
  4. a1.sources.stresssource-1.size = 10240
  5. a1.sources.stresssource-1.maxTotalEvents = 1000000
  6. a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

Legacy Sources可以让Flume1.x版本的Agent接收来自于Flume0.9.4版本的Agent发来的Event,可以理解为连接两个版本Flume的一个“桥”。接收到0.9.4版本的Event后转换为1.x版本的Event然后发送到channel。0.9.4版本的Event属性(timestamp, pri, host, nanos, etc)会被转换到1.xEvent的header中。Legacy Sources支持Avro和Thrift RPC两种方式连接。具体的用法是1.x的Agent可以使用 avroLegacy或者 thriftLegacy source,然后0.9.4的Agent需要指定sink的host和端口为1.x的 Agent。

注解

1.x和0.9.x的可靠性保证有所不同。Legacy Sources并不支持0.9.x的E2E和DFO模式。唯一支持的是BE(best effort,尽力而为),尽管1.x的可靠性保证对于从0.9.x传输过来并且已经存在channel里面的Events是有效的。

提示

虽然数据进入了Flume 1.x的channel之后是适用1.x的可靠性保证,但是从0.9.x到1.x的时候只是BE保证,既然只有BE的保证,也就是说Legacy Sources不算是可靠的传输。对于这种跨版本的部署使用行为要慎重。

必需的参数已用 粗体 标明。

Avro Legacy Source

属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: org.apache.flume.source.avroLegacy.AvroLegacySource

host



要监听的hostname或者IP地址

port



要监听的端口

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
  4. a1.sources.r1.host = 0.0.0.0
  5. a1.sources.r1.bind = 6666
  6. a1.sources.r1.channels = c1
Thrift Legacy Source

属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个是: org.apache.flume.source.thriftLegacy.ThriftLegacySource

host



要监听的hostname或者IP地址

port



要监听的端口

selector.type

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

replicating

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
  4. a1.sources.r1.host = 0.0.0.0
  5. a1.sources.r1.bind = 6666
  6. a1.sources.r1.channels = c1

Custom Source

你可以自己写一个Source接口的实现类。启动Flume时候必须把你自定义Source所依赖的其他类配置进Agent的classpath内。custom source在写配置文件的type时候填你的全限定类名。

提示

如果前面章节的那些Source都无法满足你的需求,你可以写一个自定义的Source,与你见过的其他框架的自定义组件写法如出一辙,实现个接口而已,然后把你写的类打成jar包,连同依赖的jar包一同配置进Flume的classpath。后面章节中的自定义Sink、自定义Channel等都是一样的步骤,不会再赘述。


属性

默认值

解释

channels



与Source绑定的channel,多个用空格分开

type



组件类型,这个填你自己Source的全限定类名

selector.type

replicating

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

interceptors



该source所使用的拦截器,多个用空格分开

interceptors.

拦截器相关的属性配

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.example.MySource
  4. a1.sources.r1.channels = c1

Scribe Source

提示

这里先说一句,Scribe是Facebook出的一个实时的日志聚合系统,我在之前没有听说过也没有使用过它,从Scribe项目的Github文档里面了解到它在2013年就已经停止更新和支持了,貌似现在已经没有新的用户选择使用它了,所以Scribe Source这一节了解一下就行了。

Scribe 是另外一个类似于Flume的数据收集系统。为了对接现有的Scribe可以使用ScribeSource ,它是基于Thrift 的兼容传输协议,如何部署Scribe请参考Facebook提供的文档。

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.source.scribe.ScribeSource

port

1499

Scribe 的端口

maxReadBufferBytes

16384000

Thrift 默认的FrameBuffer 大小

workerThreads

5

Thrift的线程数

selector.type

可选值:replicatingmultiplexing ,分别表示: 复制、多路复用

selector.*

channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同

配置范例:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
  4. a1.sources.r1.port = 1463
  5. a1.sources.r1.workerThreads = 5
  6. a1.sources.r1.channels = c1