Flume Channels

channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

Memory Channel

内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: memory

capacity

100

内存中存储 Event 的最大数

transactionCapacity

100

source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

keep-alive

3

添加或删除一个 Event 的超时时间(秒)

byteCapacityBufferPercentage

20

指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比

byteCapacity

Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候,而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event),这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

提示

举2个例子来帮助理解最后两个参数吧:

两个例子都有共同的前提,假设JVM最大的可用内存是100M(或者说JVM启动时指定了-Xmx=100m)。

例子1: byteCapacityBufferPercentage 设置为20, byteCapacity 设置为52428800(就是50M),此时内存中所有 Event body 的总大小就被限制为50M *(1-20%)=40M,内存channel可用内存是50M。

例子2: byteCapacityBufferPercentage 设置为10, byteCapacity 不设置,此时内存中所有 Event body 的总大小就被限制为100M 80% (1-10%)=72M,内存channel可用内存是80M。

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = memory
  3. a1.channels.c1.capacity = 10000
  4. a1.channels.c1.transactionCapacity = 10000
  5. a1.channels.c1.byteCapacityBufferPercentage = 20
  6. a1.channels.c1.byteCapacity = 800000

JDBC Channel

JDBC Channel会通过一个数据库把Event持久化存储。目前只支持Derby。这是一个可靠的channel,非常适合那些注重可恢复性的流使用。必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: jdbc

db.type

DERBY

使用的数据库类型,目前只支持 DERBY.

driver.class

org.apache.derby.jdbc.EmbeddedDriver

所使用数据库的 JDBC 驱动类

driver.url

(constructed from other properties)

JDBC 连接的 URL

db.username

“sa”

连接数据库使用的用户名

db.password



连接数据库使用的密码

connection.properties.file



JDBC连接属性的配置文件

create.schema

true

如果设置为 true ,没有数据表的时候会自动创建

create.index

true

是否创建索引来加快查询速度

create.foreignkey

true

是否创建外键

transaction.isolation

“READ_COMMITTED”

面向连接的隔离级别,可选值: READ_UNCOMMITTEDREAD_COMMITTEDSERIALIZABLEREPEATABLE_READ

maximum.connections

10

数据库的最大连接数

maximum.capacity

0 (unlimited)

channel 中存储 Event 的最大数

sysprop.*

针对不同DB的特定属性

sysprop.user.home

Derby 的存储主路径

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = jdbc

Kafka Channel

将 Event 存储到Kafka集群(必须单独安装)。Kafka提供了高可用性和复制机制,因此如果Flume实例或者 Kafka 的实例挂掉,能保证Event数据随时可用。Kafka channel可以用于多种场景:

  • 与source和sink一起:给所有Event提供一个可靠、高可用的channel。

  • 与source、interceptor一起,但是没有sink:可以把所有Event写入到Kafka的topic中,来给其他的应用使用。

  • 与sink一起,但是没有source:提供了一种低延迟、容错高的方式将Event发送的各种Sink上,比如:HDFS、HBase、Solr。

由于依赖于该版本附带的Kafka客户端,Flume1.8需要Kafka 0.9或更高版本。 与之前的Flume版本相比,channel的配置发生了一些变化。

配置参数组织如下:

  • 通常与channel相关的配置值应用于channel配置级别,比如:a1.channel.k1.type =

  • 与Kafka相关的配置值或Channel运行的以“kafka.”为前缀(这与CommonClient Configs类似),例如:a1.channels.k1.kafka.topica1.channels.k1.kafka.bootstrap.servers。 这与hdfs sink的运行方式没有什么不同

  • 特定于生产者/消费者的属性以kafka.producer或kafka.consumer为前缀

  • 可能的话,使用Kafka的参数名称,例如:bootstrap.servers 和 acks

当前Flume版本是向下兼容的,但是第二个表中列出了一些不推荐使用的属性,并且当它们出现在配置文件中时,会在启动时打印警告日志。

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers



channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔

kafka.topic

flume-channel

channel使用的Kafka topic

kafka.consumer.group.id

flume

channel 用于向 Kafka 注册的消费者群组ID。 多个 channel 必须使用相同的 topic 和 group,以确保当一个Flume实例发生故障时,另一个实例可以获取数据。请注意,使用相同组ID的非channel消费者可能会导致数据丢失。

parseAsFlumeEvent

true

是否以avro基准的 Flume Event 格式在channel中存储Event。如果是Flume的Source向channel的topic写入Event则应设置为true;如果其他生产者也在向channel的topic写入Event则应设置为false。通过使用 flume-ng-sdk 中的 org.apache.flume.source.avro.AvroFlumeEvent 可以在Kafka之外解析出Flume source的信息。

migrateZookeeperOffsets

true

如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过 kafka.consumer.auto.offset.reset 配置如何处理偏移量。

pollTimeout

500

消费者调用poll()方法时的超时时间(毫秒)https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)

defaultPartitionId



指定channel中所有Event将要存储的分区ID,除非被 partitionIdHeader 参数的配置覆盖。默认情况下,如果没有设置此参数,Event 会被Kafka生产者的分发程序分发,包括key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发。

partitionIdHeader



从Event header中读取要存储Event到目标Kafka的分区的属性名。如果设置了,生产者会从Event header中获取次属性的值,并将消息发送到topic的指定分区。如果该值表示的分区无效,则Event不会存入channel。如果该值有效,则会覆盖 defaultPartitionId 配置的分区ID。

kafka.consumer.auto.offset.reset

latest

当Kafka中没有初始偏移量或者当前偏移量已经不在当前服务器上时(比如数据已经被删除)该怎么办。earliest:自动重置偏移量到最早的位置;latest:自动重置偏移量到最新的位置;none:如果没有为消费者的组找到任何先前的偏移量,则向消费者抛出异常;else:向消费者抛出异常。

kafka.producer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入Kafka。可选值: SASLPLAINTEXTSASL_SSLSSL 有关安全设置的其他信息,请参见下文。

kafka.consumer.security.protocol

PLAINTEXT

与上面的相同,只不过是用于消费者。

_more producer/consumer security props

如果使用了 SASL_PLAINTEXTSASL_SSLSSL 等安全协议,参考 Kafka security 来为生产者、消费者增加安全相关的参数配置

下表是弃用的一些参数


属性

默认值

解释

brokerList



改用 kafka.bootstrap.servers

topic

flume-channel

改用 kafka.topic

groupId

flume

改用 kafka.consumer.group.id

readSmallestOffset

false

改用 kafka.consumer.auto.offset.reset

注解

由于channel是负载均衡的,第一次启动时可能会有重复的Event出现。

配置范例:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer

安全与加密:

Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。

截至目前,数据加密仅由SSL / TLS提供。

当你把 kafka.producer(consumer).security.protocol 设置下面任何一个值的时候意味着:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证

  • SASL_SSL - 有数据加密的 Kerberos 或明文认证

  • SSL - 基于TLS的加密,可选的身份验证。

警告

启用 SSL 时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overviewKAFKA-2561

使用TLS:

请阅读 Configuring Kafka Clients SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。

服务端认证和数据加密的一个配置实例:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SSL
  6. a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  7. a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
  8. a1.channels.channel1.kafka.consumer.security.protocol = SSL
  9. a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
  10. a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:

  1. a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
  2. a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。

  1. a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
  2. a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
  3. a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
  4. a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl.key.password 属性将为消费者和生产者密钥库提供所需的额外密码:

  1. a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
  2. a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos安全配置:

要将Kafka channel 与使用Kerberos保护的Kafka群集一起使用,请为生产者或消费者设置上面提到的producer(consumer).security.protocol属性。 与Kafka实例一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了Zookeeper连接信息(如果需要)。 有关JAAS文件内容的信息,请参阅 Kafka doc。 可以通过flume-env.sh中的JAVA_OPTS指定此JAAS文件的位置以及系统范围的 kerberos 配置:

  1. JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
  2. JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
  6. a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
  7. a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
  8. a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
  9. a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
  10. a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的安全配置范例:

  1. a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
  2. a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
  3. a1.channels.channel1.kafka.topic = channel1
  4. a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  5. a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
  6. a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
  7. a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
  8. a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
  9. a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
  10. a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
  11. a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
  12. a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
  13. a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
  14. a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka Source 也可以连接 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不必要这样做,或者您需要此部分用于其他安全组件。 另外,请确保Flume进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. storeKey=true
  5. keyTab="/path/to/keytabs/flume.keytab"
  6. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  7. };
  8.  
  9. KafkaClient {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. storeKey=true
  13. keyTab="/path/to/keytabs/flume.keytab"
  14. principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
  15. };

File Channel

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: file.

checkpointDir

~/.flume/file-channel/checkpoint

记录检查点的文件的存储目录

useDualCheckpoints

false

是否备份检查点文件。如果设置为 truebackupCheckpointDir 参数必须设置。

backupCheckpointDir



备份检查点的目录。 此目录不能与数据目录或检查点目录 checkpointDir 相同

dataDirs

~/.flume/file-channel/data

逗号分隔的目录列表,用于存储日志文件。 在不同物理磁盘上使用多个目录可以提高文件channel的性能

transactionCapacity

10000

channel支持的单个事务最大容量

checkpointInterval

30000

检查点的时间间隔(毫秒)

maxFileSize

2146435071

单个日志文件的最大字节数。这个默认值约等于2047MB

minimumRequiredSpace

524288000

最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,文件channel将拒绝一切存取请求

capacity

1000000

channel的最大容量

keep-alive

3

存入Event的最大等待时间(秒)

use-log-replay-v1

false

(专家)是否使用老的回放逻辑(Flume默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序)

use-fast-replay

false

(专家)是否开启快速回放(不适用队列)

checkpointOnClose

true

channel关闭时是否创建检查点文件。开启次功能可以避免回放提高下次文件channel启动的速度

encryption.activeKey



加密数据所使用的key名称

encryption.cipherProvider



加密类型,目前只支持:AESCTRNOPADDING

encryption.keyProvider



key类型,目前只支持:JCEKSFILE

encryption.keyProvider.keyStoreFile



keystore 文件路径

encrpytion.keyProvider.keyStorePasswordFile



keystore 密码文件路径

encryption.keyProvider.keys



所有key的列表,包含所有使用过的加密key名称

encyption.keyProvider.keys.*.passwordFile



可选的秘钥密码文件路径

注解

默认情况下,文件channel使用默认的用户主目录内的检查点和数据目录的路径(说的就是上面的checkpointDir参数的默认值)。 如果一个Agent中有多个活动的文件channel实例,而且都是用了默认的检查点文件,则只有一个实例可以锁定目录并导致其他channel初始化失败。 因此,这时候有必要为所有已配置的channel显式配置不同的检查点文件目录,最好是在不同的磁盘上。此外,由于文件channel将在每次提交后会同步到磁盘,因此将其与将Event一起批处理的sink/source耦合可能是必要的,以便在多个磁盘不可用于检查点和数据目录时提供良好的性能。

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = file
  3. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  4. a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

下面是几个加密的例子:

用给定的秘钥库密码生成秘钥key-0:

  1. keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  2. -keysize 128 -validity 9000 -keystore test.keystore \
  3. -storetype jceks -storepass keyStorePassword

使用相同的秘钥库密码生成秘钥key-1:

  1. keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  2. -keystore src/test/resources/test.keystore -storetype jceks \
  3. -storepass keyStorePassword
  1. a1.channels.c1.encryption.activeKey = key-0
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = key-provider-0
  4. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  5. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  6. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  7. a1.channels.c1.encryption.keyProvider.keys = key-0

假设你已不再使用key-0,并且已经使用key-1加密新文件:

  1. a1.channels.c1.encryption.activeKey = key-1
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  4. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  5. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  6. a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

跟上面一样的场景,只不过key-0有自己单独的密码:

  1. a1.channels.c1.encryption.activeKey = key-1
  2. a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
  3. a1.channels.c1.encryption.keyProvider = JCEKSFILE
  4. a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
  5. a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
  6. a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
  7. a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

Spillable Memory Channel

这个channel会将Event存储在内存队列和磁盘上。 内存队列充当主存储,内存装满之后会存到磁盘。 磁盘存储使用嵌入的文件channel进行管理。 当内存队列已满时,其他传入Event将存储在文件channel中。这个channel非常适用于需要高吞吐量存储器channel的流,但同时需要更大容量的文件channel,以便更好地容忍间歇性目的地侧(sink)中断或消费速率降低。在这种异常情况下,吞吐量将大致降低到文件channel速度。 如果Agent程序崩溃或重新启动,只有存储在磁盘上的Event能恢复。 这个channel目前是实验性的,不建议用于生产环境

提示

这个channel的机制十分像Windows系统里面的「虚拟内存」。兼顾了内存channel的高吞吐量和文件channel的可靠、大容量优势。

必需的参数已用 粗体 标明。有关其他必需属性,请参阅文件channel。


属性

默认值

解释

type



组件类型,这个是: SPILLABLEMEMORY

memoryCapacity

10000

内存队列存储的Event最大数量。如果设置为0,则会禁用内存队列。

overflowCapacity

100000000

磁盘(比如文件channel)上存储Event的最大数量,如果设置为0,则会禁用磁盘存储

overflowTimeout

3

当内存占满时启用磁盘存储之前等待的最大秒数

byteCapacityBufferPercentage

20

指定Event header所占空间大小与channel中所有Event的总大小之间的百分比

byteCapacity

内存中最大允许存储Event的总字节数。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算Event的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个Agent里面有多个内存channel的时候,而且碰巧这些channel存储相同的物理Event(例如:这些channel通过复制机制(复制选择器)接收同一个source中的Event),这时候这些Event占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

avgEventSize

500

估计进入channel的Event的平均大小(单位:字节)

<file channel properties>

see file channel

可以使用除“keep-alive”和“capacity”之外的任何文件channel属性。 文件channel的“keep-alive”由Spillable Memory Channel管理,而channel容量则是通过使用 overflowCapacity 来设置。

如果达到 memoryCapacitybyteCapacity 限制,则内存队列被视为已满。

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 10000
  4. a1.channels.c1.overflowCapacity = 1000000
  5. a1.channels.c1.byteCapacity = 800000
  6. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  7. a1.channels.c1.dataDirs = /mnt/flume/data

禁用内存channel,只使用磁盘存储(就像文件channel那样)的例子:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 0
  4. a1.channels.c1.overflowCapacity = 1000000
  5. a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
  6. a1.channels.c1.dataDirs = /mnt/flume/data

禁用掉磁盘存储,只使用内存channel的例子:

  1. a1.channels = c1
  2. a1.channels.c1.type = SPILLABLEMEMORY
  3. a1.channels.c1.memoryCapacity = 100000
  4. a1.channels.c1.overflowCapacity = 0

Pseudo Transaction Channel

警告

这个伪事务 channel 仅用于单元测试目的,不适用于生产用途。

必需的参数已用 粗体 标明。


属性

默认值

解释

type



组件类型,这个是: org.apache.flume.channel.PseudoTxnMemoryChannel

capacity

50

channel中存储的最大Event数

keep-alive

3

添加或删除Event的超时时间(秒)

Custom Channel

可以自己实现Channel接口来自定义一个channel,启动时这个自定义channel类以及依赖必须都放在flume Agent的classpath中。必需的参数已用 粗体 标明。


属性

默认值

解释

type



你自己实现的channel类的全限定类名,比如:org.example.myCustomChannel

配置范例:

  1. a1.channels = c1
  2. a1.channels.c1.type = org.example.MyChannel