Flume Channels
channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。
Memory Channel
内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: memory |
capacity | 100 | 内存中存储 Event 的最大数 |
transactionCapacity | 100 | source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大) |
keep-alive | 3 | 添加或删除一个 Event 的超时时间(秒) |
byteCapacityBufferPercentage | 20 | 指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比 |
byteCapacity | Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候,而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event),这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。 |
提示
举2个例子来帮助理解最后两个参数吧:
两个例子都有共同的前提,假设JVM最大的可用内存是100M(或者说JVM启动时指定了-Xmx=100m)。
例子1: byteCapacityBufferPercentage 设置为20, byteCapacity 设置为52428800(就是50M),此时内存中所有 Event body 的总大小就被限制为50M *(1-20%)=40M,内存channel可用内存是50M。
例子2: byteCapacityBufferPercentage 设置为10, byteCapacity 不设置,此时内存中所有 Event body 的总大小就被限制为100M 80% (1-10%)=72M,内存channel可用内存是80M。
配置范例:
- a1.channels = c1
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 10000
- a1.channels.c1.byteCapacityBufferPercentage = 20
- a1.channels.c1.byteCapacity = 800000
JDBC Channel
JDBC Channel会通过一个数据库把Event持久化存储。目前只支持Derby。这是一个可靠的channel,非常适合那些注重可恢复性的流使用。必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: jdbc |
db.type | DERBY | 使用的数据库类型,目前只支持 DERBY. |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | 所使用数据库的 JDBC 驱动类 |
driver.url | (constructed from other properties) | JDBC 连接的 URL |
db.username | “sa” | 连接数据库使用的用户名 |
db.password | – | 连接数据库使用的密码 |
connection.properties.file | – | JDBC连接属性的配置文件 |
create.schema | true | 如果设置为 true ,没有数据表的时候会自动创建 |
create.index | true | 是否创建索引来加快查询速度 |
create.foreignkey | true | 是否创建外键 |
transaction.isolation | “READ_COMMITTED” | 面向连接的隔离级别,可选值: READ_UNCOMMITTED , READ_COMMITTED , SERIALIZABLE , REPEATABLE_READ |
maximum.connections | 10 | 数据库的最大连接数 |
maximum.capacity | 0 (unlimited) | channel 中存储 Event 的最大数 |
sysprop.* | 针对不同DB的特定属性 | |
sysprop.user.home | Derby 的存储主路径 |
配置范例:
- a1.channels = c1
- a1.channels.c1.type = jdbc
Kafka Channel
将 Event 存储到Kafka集群(必须单独安装)。Kafka提供了高可用性和复制机制,因此如果Flume实例或者 Kafka 的实例挂掉,能保证Event数据随时可用。Kafka channel可以用于多种场景:
与source和sink一起:给所有Event提供一个可靠、高可用的channel。
与source、interceptor一起,但是没有sink:可以把所有Event写入到Kafka的topic中,来给其他的应用使用。
与sink一起,但是没有source:提供了一种低延迟、容错高的方式将Event发送的各种Sink上,比如:HDFS、HBase、Solr。
由于依赖于该版本附带的Kafka客户端,Flume1.8需要Kafka 0.9或更高版本。 与之前的Flume版本相比,channel的配置发生了一些变化。
配置参数组织如下:
通常与channel相关的配置值应用于channel配置级别,比如:a1.channel.k1.type =
与Kafka相关的配置值或Channel运行的以“kafka.”为前缀(这与CommonClient Configs类似),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。 这与hdfs sink的运行方式没有什么不同
特定于生产者/消费者的属性以kafka.producer或kafka.consumer为前缀
可能的话,使用Kafka的参数名称,例如:bootstrap.servers 和 acks
当前Flume版本是向下兼容的,但是第二个表中列出了一些不推荐使用的属性,并且当它们出现在配置文件中时,会在启动时打印警告日志。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – | channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔 |
kafka.topic | flume-channel | channel使用的Kafka topic |
kafka.consumer.group.id | flume | channel 用于向 Kafka 注册的消费者群组ID。 多个 channel 必须使用相同的 topic 和 group,以确保当一个Flume实例发生故障时,另一个实例可以获取数据。请注意,使用相同组ID的非channel消费者可能会导致数据丢失。 |
parseAsFlumeEvent | true | 是否以avro基准的 Flume Event 格式在channel中存储Event。如果是Flume的Source向channel的topic写入Event则应设置为true;如果其他生产者也在向channel的topic写入Event则应设置为false。通过使用 flume-ng-sdk 中的 org.apache.flume.source.avro.AvroFlumeEvent 可以在Kafka之外解析出Flume source的信息。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过 kafka.consumer.auto.offset.reset 配置如何处理偏移量。 |
pollTimeout | 500 | 消费者调用poll()方法时的超时时间(毫秒)https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) |
defaultPartitionId | – | 指定channel中所有Event将要存储的分区ID,除非被 partitionIdHeader 参数的配置覆盖。默认情况下,如果没有设置此参数,Event 会被Kafka生产者的分发程序分发,包括key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发。 |
partitionIdHeader | – | 从Event header中读取要存储Event到目标Kafka的分区的属性名。如果设置了,生产者会从Event header中获取次属性的值,并将消息发送到topic的指定分区。如果该值表示的分区无效,则Event不会存入channel。如果该值有效,则会覆盖 defaultPartitionId 配置的分区ID。 |
kafka.consumer.auto.offset.reset | latest | 当Kafka中没有初始偏移量或者当前偏移量已经不在当前服务器上时(比如数据已经被删除)该怎么办。earliest:自动重置偏移量到最早的位置;latest:自动重置偏移量到最新的位置;none:如果没有为消费者的组找到任何先前的偏移量,则向消费者抛出异常;else:向消费者抛出异常。 |
kafka.producer.security.protocol | PLAINTEXT | 设置使用哪种安全协议写入Kafka。可选值: SASLPLAINTEXT 、 SASL_SSL 和 SSL 有关安全设置的其他信息,请参见下文。 |
kafka.consumer.security.protocol | PLAINTEXT | 与上面的相同,只不过是用于消费者。 |
_more producer/consumer security props | 如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者、消费者增加安全相关的参数配置 |
下表是弃用的一些参数
属性 | 默认值 | 解释 |
---|---|---|
brokerList | – | 改用 kafka.bootstrap.servers |
topic | flume-channel | 改用 kafka.topic |
groupId | flume | 改用 kafka.consumer.group.id |
readSmallestOffset | false | 改用 kafka.consumer.auto.offset.reset |
注解
由于channel是负载均衡的,第一次启动时可能会有重复的Event出现。
配置范例:
- a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
- a1.channels.channel1.kafka.topic = channel1
- a1.channels.channel1.kafka.consumer.group.id = flume-consumer
安全与加密:
Flume 和 Kafka 之间通信渠道是支持安全认证和数据加密的。对于身份安全验证,可以使用 Kafka 0.9.0版本中的 SASL、GSSAPI (Kerberos V5) 或 SSL (虽然名字是SSL,实际是TLS实现)。
截至目前,数据加密仅由SSL / TLS提供。
当你把 kafka.producer(consumer).security.protocol 设置下面任何一个值的时候意味着:
SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证
SASL_SSL - 有数据加密的 Kerberos 或明文认证
SSL - 基于TLS的加密,可选的身份验证。
警告
启用 SSL 时性能会下降,影响大小取决于 CPU 和 JVM 实现。参考 Kafka security overview 和 KAFKA-2561 。
使用TLS:
请阅读 Configuring Kafka Clients SSL 中描述的步骤来了解用于微调的其他配置设置,例如下面的几个例子:启用安全策略、密码套件、启用协议、信任库或秘钥库类型。
服务端认证和数据加密的一个配置实例:
- a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.channels.channel1.kafka.topic = channel1
- a1.channels.channel1.kafka.consumer.group.id = flume-consumer
- a1.channels.channel1.kafka.producer.security.protocol = SSL
- a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
- a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
- a1.channels.channel1.kafka.consumer.security.protocol = SSL
- a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
- a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
注意,默认情况下 ssl.endpoint.identification.algorithm 这个参数没有被定义,因此不会执行主机名验证。如果要启用主机名验证,请加入以下配置:
- a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
- a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
开启后,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
如果还需要客户端身份验证,则还应在 Flume 配置中添加以下内容。 每个Flume 实例都必须拥有其客户证书,来被Kafka 实例单独或通过其签名链来信任。 常见示例是由 Kafka 信任的单个根CA签署每个客户端证书。
- a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
- a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
- a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
- a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
如果密钥库和密钥使用不同的密码保护,则ssl.key.password 属性将为消费者和生产者密钥库提供所需的额外密码:
- a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
- a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
Kerberos安全配置:
要将Kafka channel 与使用Kerberos保护的Kafka群集一起使用,请为生产者或消费者设置上面提到的producer(consumer).security.protocol属性。 与Kafka实例一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了Zookeeper连接信息(如果需要)。 有关JAAS文件内容的信息,请参阅 Kafka doc。 可以通过flume-env.sh中的JAVA_OPTS指定此JAAS文件的位置以及系统范围的 kerberos 配置:
- JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
- JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用 SASL_PLAINTEXT 的示例安全配置:
- a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.channels.channel1.kafka.topic = channel1
- a1.channels.channel1.kafka.consumer.group.id = flume-consumer
- a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
- a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
- a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
- a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
- a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
使用 SASL_SSL 的安全配置范例:
- a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.channels.channel1.kafka.topic = channel1
- a1.channels.channel1.kafka.consumer.group.id = flume-consumer
- a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
- a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
- a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
- a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
- a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
- a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
- a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
- a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
- a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
JAAS 文件配置示例。有关其内容的参考,请参阅Kafka文档 SASL configuration 中关于所需认证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka Source 也可以连接 Zookeeper 以进行偏移迁移,因此“Client”部分也添加到此示例中。除非您需要偏移迁移,否则不必要这样做,或者您需要此部分用于其他安全组件。 另外,请确保Flume进程的操作系统用户对 JAAS 和 keytab 文件具有读权限。
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
- KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
File Channel
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: file . |
checkpointDir | ~/.flume/file-channel/checkpoint | 记录检查点的文件的存储目录 |
useDualCheckpoints | false | 是否备份检查点文件。如果设置为 true , backupCheckpointDir 参数必须设置。 |
backupCheckpointDir | – | 备份检查点的目录。 此目录不能与数据目录或检查点目录 checkpointDir 相同 |
dataDirs | ~/.flume/file-channel/data | 逗号分隔的目录列表,用于存储日志文件。 在不同物理磁盘上使用多个目录可以提高文件channel的性能 |
transactionCapacity | 10000 | channel支持的单个事务最大容量 |
checkpointInterval | 30000 | 检查点的时间间隔(毫秒) |
maxFileSize | 2146435071 | 单个日志文件的最大字节数。这个默认值约等于2047MB |
minimumRequiredSpace | 524288000 | 最小空闲空间的字节数。为了避免数据损坏,当空闲空间低于这个值的时候,文件channel将拒绝一切存取请求 |
capacity | 1000000 | channel的最大容量 |
keep-alive | 3 | 存入Event的最大等待时间(秒) |
use-log-replay-v1 | false | (专家)是否使用老的回放逻辑(Flume默认是使用v2版本的回放方法,但是如果v2版本不能正常工作可以考虑通过这个参数改为使用v1版本,v1版本是从Flume1.2开始启用的,回放是指系统关闭或者崩溃前执行的校验检查点文件和文件channel记录是否一致程序) |
use-fast-replay | false | (专家)是否开启快速回放(不适用队列) |
checkpointOnClose | true | channel关闭时是否创建检查点文件。开启次功能可以避免回放提高下次文件channel启动的速度 |
encryption.activeKey | – | 加密数据所使用的key名称 |
encryption.cipherProvider | – | 加密类型,目前只支持: AESCTRNOPADDING |
encryption.keyProvider | – | key类型,目前只支持: JCEKSFILE |
encryption.keyProvider.keyStoreFile | – | keystore 文件路径 |
encrpytion.keyProvider.keyStorePasswordFile | – | keystore 密码文件路径 |
encryption.keyProvider.keys | – | 所有key的列表,包含所有使用过的加密key名称 |
encyption.keyProvider.keys.*.passwordFile | – | 可选的秘钥密码文件路径 |
注解
默认情况下,文件channel使用默认的用户主目录内的检查点和数据目录的路径(说的就是上面的checkpointDir参数的默认值)。 如果一个Agent中有多个活动的文件channel实例,而且都是用了默认的检查点文件,则只有一个实例可以锁定目录并导致其他channel初始化失败。 因此,这时候有必要为所有已配置的channel显式配置不同的检查点文件目录,最好是在不同的磁盘上。此外,由于文件channel将在每次提交后会同步到磁盘,因此将其与将Event一起批处理的sink/source耦合可能是必要的,以便在多个磁盘不可用于检查点和数据目录时提供良好的性能。
配置范例:
- a1.channels = c1
- a1.channels.c1.type = file
- a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
- a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
下面是几个加密的例子:
用给定的秘钥库密码生成秘钥key-0:
- keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
- -keysize 128 -validity 9000 -keystore test.keystore \
- -storetype jceks -storepass keyStorePassword
使用相同的秘钥库密码生成秘钥key-1:
- keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
- -keystore src/test/resources/test.keystore -storetype jceks \
- -storepass keyStorePassword
- a1.channels.c1.encryption.activeKey = key-0
- a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
- a1.channels.c1.encryption.keyProvider = key-provider-0
- a1.channels.c1.encryption.keyProvider = JCEKSFILE
- a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
- a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
- a1.channels.c1.encryption.keyProvider.keys = key-0
假设你已不再使用key-0,并且已经使用key-1加密新文件:
- a1.channels.c1.encryption.activeKey = key-1
- a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
- a1.channels.c1.encryption.keyProvider = JCEKSFILE
- a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
- a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
- a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
跟上面一样的场景,只不过key-0有自己单独的密码:
- a1.channels.c1.encryption.activeKey = key-1
- a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
- a1.channels.c1.encryption.keyProvider = JCEKSFILE
- a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
- a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
- a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
- a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Spillable Memory Channel
这个channel会将Event存储在内存队列和磁盘上。 内存队列充当主存储,内存装满之后会存到磁盘。 磁盘存储使用嵌入的文件channel进行管理。 当内存队列已满时,其他传入Event将存储在文件channel中。这个channel非常适用于需要高吞吐量存储器channel的流,但同时需要更大容量的文件channel,以便更好地容忍间歇性目的地侧(sink)中断或消费速率降低。在这种异常情况下,吞吐量将大致降低到文件channel速度。 如果Agent程序崩溃或重新启动,只有存储在磁盘上的Event能恢复。 这个channel目前是实验性的,不建议用于生产环境 。
提示
这个channel的机制十分像Windows系统里面的「虚拟内存」。兼顾了内存channel的高吞吐量和文件channel的可靠、大容量优势。
必需的参数已用 粗体 标明。有关其他必需属性,请参阅文件channel。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: SPILLABLEMEMORY |
memoryCapacity | 10000 | 内存队列存储的Event最大数量。如果设置为0,则会禁用内存队列。 |
overflowCapacity | 100000000 | 磁盘(比如文件channel)上存储Event的最大数量,如果设置为0,则会禁用磁盘存储 |
overflowTimeout | 3 | 当内存占满时启用磁盘存储之前等待的最大秒数 |
byteCapacityBufferPercentage | 20 | 指定Event header所占空间大小与channel中所有Event的总大小之间的百分比 |
byteCapacity | 内存中最大允许存储Event的总字节数。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算Event的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个Agent里面有多个内存channel的时候,而且碰巧这些channel存储相同的物理Event(例如:这些channel通过复制机制(复制选择器)接收同一个source中的Event),这时候这些Event占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。 | |
avgEventSize | 500 | 估计进入channel的Event的平均大小(单位:字节) |
<file channel properties> | see file channel | 可以使用除“keep-alive”和“capacity”之外的任何文件channel属性。 文件channel的“keep-alive”由Spillable Memory Channel管理,而channel容量则是通过使用 overflowCapacity 来设置。 |
如果达到 memoryCapacity 或 byteCapacity 限制,则内存队列被视为已满。
配置范例:
- a1.channels = c1
- a1.channels.c1.type = SPILLABLEMEMORY
- a1.channels.c1.memoryCapacity = 10000
- a1.channels.c1.overflowCapacity = 1000000
- a1.channels.c1.byteCapacity = 800000
- a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
- a1.channels.c1.dataDirs = /mnt/flume/data
禁用内存channel,只使用磁盘存储(就像文件channel那样)的例子:
- a1.channels = c1
- a1.channels.c1.type = SPILLABLEMEMORY
- a1.channels.c1.memoryCapacity = 0
- a1.channels.c1.overflowCapacity = 1000000
- a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
- a1.channels.c1.dataDirs = /mnt/flume/data
禁用掉磁盘存储,只使用内存channel的例子:
- a1.channels = c1
- a1.channels.c1.type = SPILLABLEMEMORY
- a1.channels.c1.memoryCapacity = 100000
- a1.channels.c1.overflowCapacity = 0
Pseudo Transaction Channel
警告
这个伪事务 channel 仅用于单元测试目的,不适用于生产用途。
必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | channel中存储的最大Event数 |
keep-alive | 3 | 添加或删除Event的超时时间(秒) |
Custom Channel
可以自己实现Channel接口来自定义一个channel,启动时这个自定义channel类以及依赖必须都放在flume Agent的classpath中。必需的参数已用 粗体 标明。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 你自己实现的channel类的全限定类名,比如:org.example.myCustomChannel |
配置范例:
- a1.channels = c1
- a1.channels.c1.type = org.example.MyChannel