Flume Sources
Avro Source
Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的 Avro Sink 配合使用就组成了一个分层的拓扑结构。必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: avro |
bind | – | 监听的服务器名hostname或者ip |
port | – | 监听的端口 |
threads | – | 生成的最大工作线程数量 |
selector.type | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 | |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器的相关属性 | |
compression-type | none | 可选值: none 或 deflate 。这个类型必须跟Avro Source相匹配 |
ssl | false | 设置为 true 可启用SSL加密,如果为true必须指定下面的 keystore 和 keystore-password 。 |
keystore | – | SSL加密使用的Java keystore文件路径 |
keystore-password | – | Java keystore的密码 |
keystore-type | JKS | Java keystore的类型. 可选值有 JKS 、 PKCS12 。 |
exclude-protocols | SSLv3 | 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 |
ipFilter | false | 设置为true可启用ip过滤(netty方式的avro) |
ipFilterRules | – | netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子) |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = avro
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
ipFilterRules格式详解
ipFilterRules 可以配置一些允许或者禁止的ip规则,它的配置格式是:allow/deny:ip/name:pattern
第一部分只能是[allow]或[deny]两个词其中一个,第二部分是[ip]或[name]的其中一个,第三部分是正则,每个部分中间用“:”分隔。
比如可以配置成下面这样:
- ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
注意,最先匹配到的规则会优先生效,看下面关于localhost的两个配置的不同
- #只允许localhost的客户端连接,禁止其他所有的连接
- ipFilterRules=allow:name:localhost,deny:ip:
- #允许除了localhost以外的任意的客户端连接
- ipFilterRules=deny:name:localhost,allow:ip:
Thrift Source
监听Thrift 端口,从外部的Thrift客户端接收数据流。如果从上一层的Flume Agent的 Thrift Sink 串联后就创建了一个多层级的Flume架构(同 Avro Source 一样,只不过是协议不同而已)。Thrift Source可以通过配置让它以安全模式(kerberos authentication)运行,具体的配置看下表。必需的参数已用 粗体 标明。
提示
同Avro Source十分类似,不同的是支持了 kerberos 认证。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: thrift |
bind | – | 监听的 hostname 或 IP 地址 |
port | – | 监听的端口 |
threads | – | 生成的最大工作线程数量 |
selector.type | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 | |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器的相关属性 | |
ssl | false | 设置为true可启用SSL加密,如果为true必须指定下面的keystore和keystore-password。 |
keystore | – | SSL加密使用的Java keystore文件路径 |
keystore-password | – | Java keystore的密码 |
keystore-type | JKS | Java keystore的类型. 可选值有 JKS 、 PKCS12 |
exclude-protocols | SSLv3 | 排除支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 |
kerberos | false | 设置为 true ,开启kerberos 身份验证。在kerberos模式下,成功进行身份验证需要 agent-principal 和 agent-keytab 。安全模式下的Thrift仅接受来自已启用kerberos且已成功通过kerberos KDC验证的Thrift客户端的连接。 |
agent-principal | – | 指定Thrift Source使用的kerberos主体用于从kerberos KDC进行身份验证。 |
agent-keytab | —- | Thrift Source与Agent主体结合使用的keytab文件位置,用于对kerberos KDC进行身份验证。 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = thrift
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
Exec Source
这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true
)。 如果进程因任何原因退出,则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。
提示
cat [named pipe]和tail -F [file]都能持续地输出内容,那些不能持续输出内容的命令不可以。这里注意一下cat命令后面接的参数是命名管道(named pipe)不是文件。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: exec |
command | – | 所使用的系统命令,一般是cat 或者tail |
shell | – | 设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。 |
restartThrottle | 10000 | 尝试重新启动之前等待的时间(毫秒) |
restart | false | 如果执行命令线程挂掉,是否重启 |
logStdErr | false | 是否会记录命令的stderr内容 |
batchSize | 20 | 读取并向channel发送数据时单次发送的最大数量 |
batchTimeout | 3000 | 向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配置 |
警告
ExecSource相比于其他异步source的问题在于,如果无法将Event放入Channel中,ExecSource无法保证客户端知道它。在这种情况下数据会丢失。例如,最常见的用法是用tail -F [file]这种,应用程序负责向磁盘写入日志文件,Flume 会用tail命令从日志文件尾部读取,将每行作为一个Event发送。这里有一个明显的问题:如果channel满了然后无法继续发送Event,会发生什么?由于种种原因,Flume无法向输出日志文件的应用程序指示它需要保留日志或某些Event尚未发送。总之你需要知道:当使用ExecSource等单向异步接口时,您的应用程序永远无法保证数据已经被成功接收!作为此警告的延伸,此source传递Event时没有交付保证。为了获得更强的可靠性保证,请考虑使用 Spooling Directory Source,Taildir Source 或通过SDK直接与Flume集成。
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /var/log/secure
- a1.sources.r1.channels = c1
shell 属性是用来配置执行命令的shell(比如Bash或者Powershell)。command 会作为参数传递给 shell 执行,这使得command可以使用shell中的特性,例如通配符、后退标记、管道、循环、条件等。如果没有 shell 配置,将直接调用 command 配置的命令。shell 通常配置的值有:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等。
- a1.sources.tailsource-1.type = exec
- a1.sources.tailsource-1.shell = /bin/bash -c
- a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source
JMS Source是一个可以从JMS的队列或者topic中读取消息的组件。按理说JMS Source作为一个JMS的应用应该是能够与任意的JMS消息队列无缝衔接工作的,可事实上目前仅在ActiveMQ上做了测试。JMS Source支持配置batch size、message selector、user/pass和Event数据的转换器(converter)。注意所使用的JMS队列的jar包需要在Flume实例的classpath中,建议放在专门的插件目录plugins.d下面,或者启动时候用-classpath指定,或者编辑flume-env.sh文件的FLUME_CLASSPATH来设置。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: jms |
initialContextFactory | – | 初始上下文工厂类,比如: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | 连接工厂应显示为的JNDI名称 |
providerURL | – | JMS 的连接URL |
destinationName | – | 目的地名称 |
destinationType | – | 目的地类型, queue 或 topic |
messageSelector | – | 创建消费者时使用的消息选择器 |
userName | – | 连接JMS队列时的用户名 |
passwordFile | – | 连接JMS队列时的密码文件,注意是文件名不是密码的明文 |
batchSize | 100 | 消费JMS消息时单次发送的Event数量 |
converter.type | DEFAULT | 用来转换JMS消息为Event的转换器类,参考下面参数。 |
converter.* | – | 转换器相关的属性 |
converter.charset | UTF-8 | 转换器把JMS的文本消息转换为byte arrays时候使用的编码,默认转换器的专属参数 |
createDurableSubscription | false | 是否创建持久化订阅。 持久化订阅只能在 destinationType = topic 时使用。 如果为 true ,则必须配置 clientId 和 durableSubscriptionName。 |
clientId | – | 连接创建后立即给JMS客户端设置标识符。持久化订阅必配参数。 |
durableSubscriptionName | – | 用于标识持久订阅的名称。持久化订阅必配参数。 |
关于转换器
JMS source可以配置插入式的转换器,尽管默认的转换器已经足够应付大多数场景了,默认的转换器可以把字节、文本、对象消息转换为Event。不管哪种类型消息中的属性都会作为headers被添加到Event中。
字节消息:JMS消息中的字节会被拷贝到Event的body中,注意转换器处理的单个消息大小不能超过2GB。
文本消息:JMS消息中的文本会被转为byte array拷贝到Event的body中。默认的编码是UTF-8,可自行配置编码。
对象消息:对象消息会被写出到封装在ObjectOutputStream中的ByteArrayOutputStream里面,得到的array被复制到Event的body。
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = jms
- a1.sources.r1.channels = c1
- a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
- a1.sources.r1.connectionFactory = GenericConnectionFactory
- a1.sources.r1.providerURL = tcp://mqserver:61616
- a1.sources.r1.destinationName = BUSINESS_DATA
- a1.sources.r1.destinationType = QUEUE
Spooling Directory Source
这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。
与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:
如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。
为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。
尽管有这个Source的可靠性保证,但是仍然存在这样的情况,某些下游故障发生时会出现重复Event的情况。这与其他Flume组件提供的保证是一致的。
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: spooldir . |
spoolDir | – | Flume Source监控的文件夹目录,该目录下的文件会被Flume收集 |
fileSuffix | .COMPLETED | 被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED |
deletePolicy | never | 是否删除已完成收集的文件,可选值: never 或 immediate |
fileHeader | false | 是否添加文件的绝对路径名(绝对路径+文件名)到header中。 |
fileHeaderKey | file | 添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用) |
basenameHeader | false | 是否添加文件名(只是文件名,不包括路径)到header 中 |
basenameHeaderKey | basename | 添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用) |
includePattern | ^.$ | 指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高 |
ignorePattern | ^$ | 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。 |
trackerDir | .flumespool | 用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建 |
consumeOrder | oldest | 设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldest 、 youngest 和 random 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集 |
pollDelay | 500 | Flume监视目录内新文件产生的时间间隔,单位:毫秒 |
recursiveDirectorySearch | false | 是否收集子目录下的日志文件 |
maxBackoff | 4000 | 等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。 |
batchSize | 100 | 每次批量传输到channel时的size大小 |
inputCharset | UTF-8 | 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取) |
decodeErrorPolicy | FAIL | 当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败;REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD;IGNORE :忽略无法解析的字符。 |
deserializer | LINE | 指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口 |
deserializer. | 解析器的相关属性,根据解析器不同而不同 | |
bufferMaxLines | – | (已废弃) |
bufferMaxLineLength | 5000 | (已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配置 |
配置范例:
- a1.channels = ch-1
- a1.sources = src-1
- a1.sources.src-1.type = spooldir
- a1.sources.src-1.channels = ch-1
- a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
- a1.sources.src-1.fileHeader = true
Event反序列化器
下面是Flume内置的一些反序列化工具
LINE
这个反序列化器会把文本数据的每行解析成一个Event
属性 | 默认值 | 解释 |
---|---|---|
deserializer.maxLineLength | 2048 | 每个Event数据所包含的最大字符数,如果一行文本字符数超过这个配置就会被截断,剩下的字符会出现再后面的Event数据里 |
deserializer.outputCharset | UTF-8 | 解析Event所使用的编码 |
提示
deserializer.maxLineLength 的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符数可能超过2048,超出的部分会被截断,千万记得根据自己的日志长度调大这个值。
AVRO
这个反序列化器能够读取avro容器文件,并在文件中为每个Avro记录生成一个Event。每个Event都会在header中记录它的模式。Event的body是二进制的avro记录内容,不包括模式和容器文件元素的其余部分。
注意如果Spooling Directory Source发生了重新把一个Event放入channel的情况(比如,通道已满导致重试),则它将重置并从最新的Avro容器文件同步点重试。 为了减少此类情况下的潜在Event重复,请在Avro输入文件中更频繁地写入同步标记。
属性名 | 默认值 | 解释 |
---|---|---|
deserializer.schemaType | HASH | 如何表示模式。 默认或者指定为 HASH 时,会对Avro模式进行哈希处理,并将哈希值存储在Event header中以“flume.avro.schema.hash”这个key。如果指定为 LITERAL ,则会以JSON格式的模式存储在Event header中以“flume.avro.schema.literal”这个key。 与HASH模式相比,使用LITERAL模式效率相对较低。 |
BlobDeserializer
这个反序列化器可以反序列化一些大的二进制文件,一个文件解析成一个Event,例如pdf或者jpg文件等。注意这个解析器不太适合解析太大的文件,因为被反序列化的操作是在内存里面进行的。
属性 | 默认值 | 解释 |
---|---|---|
deserializer | – | 这个解析器没有别名缩写,需要填类的全限定名: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | 每次请求的最大读取和缓冲的字节数,默认这个值大概是95.36MB |
Taildir Source
注解
Taildir Source目前只是个预览版本,还不能运行在windows系统上。
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
Taildir Source是可靠的,即使发生文件轮换(译者注1)也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。
Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。
文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。
Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。
提示
译者注1:文件轮换 (file rotate)是英文直译。通常系统会自动丢弃日志文件中时间久远的日志,一般按照日志文件大小或时间来自动分割或丢弃的机制。参考来源:Log rotation
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: TAILDIR . |
filegroups | – | 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔 |
filegroups.<filegroupName> | – | 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名 |
positionFile | ~/.flume/taildirposition.json | 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。 |
headers.<filegroupName>.<headerKey> | – | 给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对。 |
byteOffsetHeader | false | 是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset |
skipToEnd | false | 如果在 _positionFile 里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取 |
idleTimeout | 120000 | 关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开 |
writePosInterval | 3000 | 向 positionFile 记录文件的读取位置的间隔时间(毫秒) |
batchSize | 100 | 一次读取数据行和写入channel的最大数量,通常使用默认值就很好 |
backoffSleepIncrement | 1000 | 在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒) |
maxBackoffSleep | 5000 | 每次重新尝试轮询新数据时的最大时间延迟(毫秒) |
cachePatternMatching | true | 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。 缓存匹配文件列表可以提高性能。消耗文件的顺序也将被缓存。 要求文件系统支持以至少秒级跟踪修改时间。 |
fileHeader | false | 是否在header里面存储文件的绝对路径 |
fileHeaderKey | file | 文件的绝对路径存储到header里面使用的key |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.channels = c1
- a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
- a1.sources.r1.filegroups = f1 f2
- a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
- a1.sources.r1.headers.f1.headerKey1 = value1
- a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
- a1.sources.r1.headers.f2.headerKey1 = value2
- a1.sources.r1.headers.f2.headerKey2 = value2-2
- a1.sources.r1.fileHeader = true
Twitter 1% firehose Source (实验性的)
警告
这个source 纯粹是实验性的,之后的版本可能会有改动,使用中任何风险请自行承担。
提示
从Google上搜了一下twitter firehose到底是什么东西,找到了这个 What is Twitter firehose and who can use it?,类似于Twitter提供的实时的消息流服务的API,只有少数的一些合作商公司才能使用,对于我们普通的使用者来说没有任何意义。本节可以跳过不用看了。
这个Source通过流API连接到1%的样本twitter信息流并下载这些tweet,将它们转换为Avro格式,并将Avro Event发送到下游Flume。使用者需要有Twitter开发者账号、访问令牌和秘钥。必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | 每次获取twitter数据的数据集大小,简单说就是一次取多少 |
maxBatchDurationMillis | 1000 | 每次批量获取数据的最大等待时间(毫秒) |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
- a1.sources.r1.channels = c1
- a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
- a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
- a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
- a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
- a1.sources.r1.maxBatchSize = 10
- a1.sources.r1.maxBatchDurationMillis = 200
Kafka Source
Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | Source使用的Kafka集群实例列表 |
kafka.consumer.group.id | flume | 消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组 |
kafka.topics | – | 将要读取消息的目标 Kafka topic 列表,多个用逗号分隔 |
kafka.topics.regex | – | 会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。 |
batchSize | 1000 | 一批写入 channel 的最大消息数 |
batchDurationMillis | 1000 | 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。 |
backoffSleepIncrement | 1000 | 当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
maxBackoffSleep | 5000 | Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
useFlumeEventFormat | false | 默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。 |
setTopicHeader | true | 当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。 |
topicHeader | topic | 如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。 |
kafka.consumer.security.protocol | PLAINTEXT | 设置使用哪种安全协议写入Kafka。可选值:SASLPLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。 |
_more consumer security props | 如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置 | |
Other Kafka Consumer Properties | – | 其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset |
注解
Kafka Source 覆盖了两个Kafka 消费者的参数:auto.commit.enable 这个参数被设置成了false,Kafka Source 会提交每一个批处理。Kafka Source 保证至少一次消息恢复策略。Source 启动时可以存在重复项。Kafka Source 还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer) 和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值,不建议修改这些参数。
已经弃用的一些属性:
属性名 | 默认值 | 解释 |
---|---|---|
topic | – | 改用 kafka.topics |
groupId | flume | 改用 kafka.consumer.group.id |
zookeeperConnect | – | 自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接 |
通过逗号分隔的 topic 列表进行 topic 订阅的示例:
- tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- tier1.sources.source1.channels = channel1
- tier1.sources.source1.batchSize = 5000
- tier1.sources.source1.batchDurationMillis = 2000
- tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
- tier1.sources.source1.kafka.topics = test1, test2
- tier1.sources.source1.kafka.consumer.group.id = custom.g.id
正则表达式 topic 订阅的示例:
- tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- tier1.sources.source1.channels = channel1
- tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
- tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
- # the default kafka.consumer.group.id=flume is used
安全与加密:Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。
截至目前,数据加密仅由SSL / TLS提供。
当你把 kafka.consumer.security.protocol 设置下面任何一个值的时候意味着:
SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证
SASL_SSL - 有数据加密的 Kerberos 或明文认证
SSL - 基于TLS的加密,可选的身份验证。
警告
启用SSL时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overview 和 KAFKA-2561 。
使用TLS:
请阅读 Configuring Kafka Clients SSL SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。
服务端认证和数据加密的一个配置实例:
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SSL
- a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
- a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:
- a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。
- a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
- a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
如果密钥库和密钥使用不同的密码保护,则 ssl.key.password 属性将为消费者密钥库提供所需的额外密码:
- a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos安全配置:
要将Kafka Source 与使用Kerberos保护的Kafka群集一起使用,请为消费者设置上面提到的consumer.security.protocol 属性。 与Kafka实例一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。 “客户端”部分描述了Zookeeper连接信息(如果需要)。有关JAAS文件内容的信息,请参阅 Kafka doc 。 可以通过flume-env.sh中的JAVA_OPTS指定此JAAS文件的位置以及系统范围的 kerberos 配置:
- JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
- JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用 SASL_PLAINTEXT 的示例安全配置:
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
- a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
使用 SASL_SSL 的安全配置范例:
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
- a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
- a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka Source 也可以连接 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不必要这样做,或者您需要此部分用于其他安全组件。 另外,请确保Flume进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
- KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
NetCat TCP Source
这个source十分像nc -k -l [host] [port]这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中。
提示
常见的系统日志都是逐行输出的,Flume的各种Source接收数据也基本上以行为单位进行解析和处理。不论是 NetCat TCP Source ,还是其他的读取文本类型的Source比如:Spooling Directory Source 、 Taildir Source 、 Exec Source 等也都是一样的。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: netcat |
bind | – | 要监听的 hostname 或者IP地址 |
port | – | 监听的端口 |
max-line-length | 512 | 每行解析成Event 消息体的最大字节数 |
ack-every-event | true | 对收到的每一行数据用“OK”做出响应 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配置 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
- a1.sources.r1.channels = c1
NetCat UDP Source
看名字也看得出,跟 NetCat TCP Source 是一对亲兄弟,区别是监听的协议不同。这个source就像是 nc -u -k -l [host] [port]命令一样,监听一个端口然后接收来自于这个端口上UDP协议发送过来的文本内容,逐行转换为Event发送到channel。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: netcatudp |
bind | – | 要监听的 hostname 或者IP地址 |
port | – | 监听的端口 |
remoteAddressHeader | – | UDP消息源地址(或IP)被解析到Event的header里面时所使用的key名称 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = netcatudp
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
- a1.sources.r1.channels = c1
Sequence Generator Source
这个Source是一个序列式的Event生成器,从它启动就开始生成,总共会生成totalEvents个。它并不是一个日志收集器,它通常是用来测试用的。它在发送失败的时候会重新发送失败的Event到channel,保证最终发送到channel的唯一Event数量一定是 totalEvents 个。必需的参数已用 粗体 标明。
提示
记住Flume的设计原则之一就是传输过程的『可靠性』,上面说的失败重试以及最终的数量问题,这是毫无疑问的。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: seq |
selector.type | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 | |
selector. | replicating | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 | |
batchSize | 1 | 每次请求向channel发送的 Event 数量 |
totalEvents | Long.MAX_VALUE | 这个Source会发出的Event总数,这些Event是唯一的 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = seq
- a1.sources.r1.channels = c1
Syslog Sources
这个Source是从syslog读取日志并解析为 Event,同样也分为TCP协议和UDP协议的,TCP协议的Source会按行(\n)来解析成 Event,UDP协议的Souce会把一个消息体解析为一个 Event。
Syslog TCP Source
提示
这个Syslog TCP Source在源码里面已经被@deprecated了,推荐使用 Multiport Syslog TCP Source 来代替。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: syslogtcp |
host | – | 要监听的hostname或者IP地址 |
port | – | 要监听的端口 |
eventSize | 2500 | 每行数据的最大字节数 |
keepFields | none | 是否保留syslog消息头中的一些属性到Event中,可选值 all 、none 或自定义指定保留的字段。如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 true 和 false ,建议改用 all 和 none 了。 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = syslogtcp
- a1.sources.r1.port = 5140
- a1.sources.r1.host = localhost
- a1.sources.r1.channels = c1
Multiport Syslog TCP Source
这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: multiportsyslogtcp |
host | – | 要监听的hostname或者IP地址 |
ports | – | 一个或多个要监听的端口,多个用空格分开 |
eventSize | 2500 | 解析成Event的每行数据的最大字节数 |
keepFields | none | 是否保留syslog消息头中的一些属性到Event中,可选值 all 、none 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 true 和 false ,建议改用 all 和 none 了。 |
portHeader | – | 如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。 |
charset.default | UTF-8 | 解析syslog使用的默认编码 |
charset.port.<port> | – | 针对具体某一个端口配置编码 |
batchSize | 100 | 每次请求尝试处理的最大Event数量,通常用这个默认值就很好。 |
readBufferSize | 1024 | 内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。 |
numProcessors | (自动分配) | 处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector.* | – | channel选择器的相关属性,具体属性根据设定的 _selector.type 值不同而不同 |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = multiport_syslogtcp
- a1.sources.r1.channels = c1
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.ports = 10001 10002 10003
- a1.sources.r1.portHeader = port
Syslog UDP Source
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: syslogudp |
host | – | 要监听的hostname或者IP地址 |
port | – | 要监听的端口 |
keepFields | false | 设置为true后,解析syslog时会保留Priority, Timestamp and Hostname这些属性到Event的消息体中(查看源码发现,实际上保留了priority、version、timestamp、hostname这四个字段在消息体的前面) |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = syslogudp
- a1.sources.r1.port = 5140
- a1.sources.r1.host = localhost
- a1.sources.r1.channels = c1
HTTP Source
这个Source从HTTP POST 和 GET请求里面解析 Event,GET方式目前还只是实验性的。把HTTP请求解析成Event是通过配置一个“handler”来实现的,这个“handler”必须实现 HTTPSourceHandler 接口,这个接口其实就一个方法,收到一个HttpServletRequest后解析出一个 Event 的List。从一次请求解析出来的若干个Event会以一个事务提交到channel,从而在诸如『文件channel』的一些channel上提高效率。如果handler抛出异常,这个HTTP的响应状态码是400。如果channel满了或者无法发送Event到channel,此时会返回HTTP状态码503(服务暂时不可用)。
在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | 组件类型,这个是: http | |
port | – | 要监听的端口 |
bind | 0.0.0.0 | 要监听的hostname或者IP地址 |
handler | org.apache.flume.source.http.JSONHandler | 所使用的handler,需填写handler的全限定类名 |
handler. | – | handler的一些属性配置 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器相关的属性配 | |
enableSSL | false | 设置为true启用SSL,HTTP Source不支持SSLv3协议 |
excludeProtocols | SSLv3 | 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 |
keystore | keystore 文件的位置 | |
keystorePassword | Keystore 的密码 |
提示
Flume里面很多组件都明确表示强制不支持SSLv3协议,是因为SSLv3协议的不安全,各大公司很早就表示不再支持了。
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = http
- a1.sources.r1.port = 5140
- a1.sources.r1.channels = c1
- a1.sources.r1.handler = org.example.rest.RestHandler
- a1.sources.r1.handler.nickname = random props
JSONHandler
这是HTTP Source的默认解析器(handler),根据请求所使用的编码把http请求中json格式的数据解析成Flume Event数组(不管是一个还是多个,都以数组格式进行存储),如果未指定编码,默认使用UTF-8编码。这个handler支持UTF-8、UTF-16和UTF-32编码。json数据格式如下:
- [{
- "headers" : {
- "timestamp" : "434324343",
- "host" : "random_host.example.com"
- },
- "body" : "random_body"
- },
- {
- "headers" : {
- "namenode" : "namenode.example.com",
- "datanode" : "random_datanode.example.com"
- },
- "body" : "really_random_body"
- }]
HTTP请求中设置编码必须是通过Content type来设置,application/json; charset=UTF-8(UTF-8 可以换成UTF-16 或者 UTF-32)。
一种创建这个handler使用的json格式对象 org.apache.flume.event.JSONEvent 的方法是使用Google Gson 库的Gson#fromJson(Object, Type) 方法创建json格式字符串,这个方法的第二个参数就是类型标记,用于指定Event列表的类型,像下面这样创建:
- Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler
默认情况下HTTPSource会把json处理成Event。作为一个补充的选项BlobHandler 不仅支持返回请求中的参数也包含其中的二进制数据,比如PDF文件、jpg文件等。这种可以接收附件的处理器不适合处理非常大的文件,因为这些文件都是缓冲在内存里面的。
属性 | 默认值 | 解释 |
---|---|---|
handler | – | 这里填BlobHandler的全限定类名: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | 每次请求的最大缓冲字节数 |
Stress Source
StressSource 是一个内部负载生成Source的实现, 对于压力测试非常有用 。可以配置每个Event的大小(headers为空)、也可以配置总共发送Event数量以及发送成功的Event最大数量。
提示
它跟 Sequence Generator Source 差不多,都是用来测试用的。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.source.StressSource |
size | 500 | 每个Event的大小。单位:字节(byte) |
maxTotalEvents | -1 | 总共会发送的Event数量 |
maxSuccessfulEvents | -1 | 发送成功的Event最大数量 |
batchSize | 1 | 每次请求发送Event的数量 |
配置范例:
- a1.sources = stresssource-1
- a1.channels = memoryChannel-1
- a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
- a1.sources.stresssource-1.size = 10240
- a1.sources.stresssource-1.maxTotalEvents = 1000000
- a1.sources.stresssource-1.channels = memoryChannel-1
Legacy Sources
Legacy Sources可以让Flume1.x版本的Agent接收来自于Flume0.9.4版本的Agent发来的Event,可以理解为连接两个版本Flume的一个“桥”。接收到0.9.4版本的Event后转换为1.x版本的Event然后发送到channel。0.9.4版本的Event属性(timestamp, pri, host, nanos, etc)会被转换到1.xEvent的header中。Legacy Sources支持Avro和Thrift RPC两种方式连接。具体的用法是1.x的Agent可以使用 avroLegacy或者 thriftLegacy source,然后0.9.4的Agent需要指定sink的host和端口为1.x的 Agent。
注解
1.x和0.9.x的可靠性保证有所不同。Legacy Sources并不支持0.9.x的E2E和DFO模式。唯一支持的是BE(best effort,尽力而为),尽管1.x的可靠性保证对于从0.9.x传输过来并且已经存在channel里面的Events是有效的。
提示
虽然数据进入了Flume 1.x的channel之后是适用1.x的可靠性保证,但是从0.9.x到1.x的时候只是BE保证,既然只有BE的保证,也就是说Legacy Sources不算是可靠的传输。对于这种跨版本的部署使用行为要慎重。
必需的参数已用 粗体 标明。
Avro Legacy Source
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | 要监听的hostname或者IP地址 |
port | – | 要监听的端口 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.bind = 6666
- a1.sources.r1.channels = c1
Thrift Legacy Source
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | 要监听的hostname或者IP地址 |
port | – | 要监听的端口 |
selector.type | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 | |
selector. | replicating | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.bind = 6666
- a1.sources.r1.channels = c1
Custom Source
你可以自己写一个Source接口的实现类。启动Flume时候必须把你自定义Source所依赖的其他类配置进Agent的classpath内。custom source在写配置文件的type时候填你的全限定类名。
提示
如果前面章节的那些Source都无法满足你的需求,你可以写一个自定义的Source,与你见过的其他框架的自定义组件写法如出一辙,实现个接口而已,然后把你写的类打成jar包,连同依赖的jar包一同配置进Flume的classpath。后面章节中的自定义Sink、自定义Channel等都是一样的步骤,不会再赘述。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个填你自己Source的全限定类名 |
selector.type | replicating | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector. | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors. | 拦截器相关的属性配 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.example.MySource
- a1.sources.r1.channels = c1
Scribe Source
提示
这里先说一句,Scribe是Facebook出的一个实时的日志聚合系统,我在之前没有听说过也没有使用过它,从Scribe项目的Github文档里面了解到它在2013年就已经停止更新和支持了,貌似现在已经没有新的用户选择使用它了,所以Scribe Source这一节了解一下就行了。
Scribe 是另外一个类似于Flume的数据收集系统。为了对接现有的Scribe可以使用ScribeSource ,它是基于Thrift 的兼容传输协议,如何部署Scribe请参考Facebook提供的文档。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Scribe 的端口 |
maxReadBufferBytes | 16384000 | Thrift 默认的FrameBuffer 大小 |
workerThreads | 5 | Thrift的线程数 |
selector.type | 可选值: replicating 或 multiplexing ,分别表示: 复制、多路复用 | |
selector.* | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 |
配置范例:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
- a1.sources.r1.port = 1463
- a1.sources.r1.workerThreads = 5
- a1.sources.r1.channels = c1