Flume Sources
Avro Source
Listens on Avro port and receives events from external Avro client streams.When paired with the built-in Avro Sink on another (previous hop) Flume agent,it can create tiered collection topologies.Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector. | ||
interceptors | – | Space-separated list of interceptors |
interceptors. | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = avro
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
Example of ipFilterRules
ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.
<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern>orallow/deny:ip/name:pattern
example: ipFilterRules=allow:ip:127.,allow:name:localhost,deny:ip:
Note that the first rule to match will apply as the example below shows from a client on the localhost
This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:”This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:“
Thrift Source
Listens on Thrift port and receives events from external Thrift client streams.When paired with the built-in ThriftSink on another (previous hop) Flume agent,it can create tiered collection topologies.Thrift source can be configured to start in secure mode by enabling kerberos authentication.agent-principal and agent-keytab are the properties used by theThrift source to authenticate to the kerberos KDC.Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be thrift |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector. | ||
interceptors | – | Space separated list of interceptors |
interceptors. | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section) |
keystore | – | This is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
kerberos | false | Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. |
agent-principal | – | The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. |
agent-keytab | —- | The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = thrift
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
Exec Source
Exec source runs a given Unix command on start-up and expects that process tocontinuously produce data on standard out (stderr is simply discarded, unlessproperty logStdErr is set to true). If the process exits for any reason, the source also exits andwill produce no further data. This means configurations such as cat [named pipe]or tail -F [file] are going to produce the desired results where as datewill probably not - the former two commands produce streams of data where as thelatter produces a single event and exits.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector. | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Warning
The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the eventinto the Channel the client knows about it. In such cases, the data willbe lost. As a for instance, one of the most commonly requested featuresis the tail -F [file]-like use case where an application writesto a log file on disk and Flume tails the file, sending each line as anevent. While this is possible, there’s an obvious problem; what happensif the channel fills up and Flume can’t send an event? Flume has no wayof indicating to the application writing the log file that it needs toretain the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application cannever guarantee data has been received when using a unidirectionalasynchronous interface such as ExecSource! As an extension of thiswarning - and to be completely clear - there is absolutely zero guaranteeof event delivery when using this source. For stronger reliabilityguarantees, consider the Spooling Directory Source, Taildir Source or direct integrationwith Flume via the SDK.
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /var/log/secure
- a1.sources.r1.channels = c1
The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bashor Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. Thisallows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes,loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will beinvoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’,‘cmd /c’, ‘powershell -Command’, etc.
- a1.sources.tailsource-1.type = exec
- a1.sources.tailsource-1.shell = /bin/bash -c
- a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source
JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMSapplication it should work with any JMS provider but has only been tested with ActiveMQ.The JMS source provides configurable batch size, message selector, user/pass, and messageto flume event converter. Note that the vendor provided JMS jars should be included in theFlume classpath using plugins.d directory (preferred), –classpath on command line, orvia FLUME_CLASSPATH variable in flume-env.sh.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
createDurableSubscription | false | Whether to create durable subscription. Durable subscription can only be used withdestinationType topic. If true, “clientId” and “durableSubscriptionName”have to be specified. |
clientId | – | JMS client identifier set on Connection right after it is created.Required for durable subscriptions. |
durableSubscriptionName | – | Name used to identify the durable subscription. Required for durable subscriptions. |
JMS message converter
The JMS source allows pluggable converters, though it’s likely the default converter will workfor most purposes. The default converter is able to convert Bytes, Text, and Object messagesto FlumeEvents. In all cases, the properties in the message are added as headers to theFlumeEvent.
- BytesMessage:
- Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GBof data per message.
- TextMessage:
- Text of message is converted to a byte array and copied to the body of theFlumeEvent. The default converter uses UTF-8 by default but this is configurable.
- ObjectMessage:
- Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream andthe resulting array is copied to the body of the FlumeEvent.
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = jms
- a1.sources.r1.channels = c1
- a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
- a1.sources.r1.connectionFactory = GenericConnectionFactory
- a1.sources.r1.providerURL = tcp://mqserver:61616
- a1.sources.r1.destinationName = BUSINESS_DATA
- a1.sources.r1.destinationType = QUEUE
SSL and JMS Source
JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE(Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely theJMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMSserver has also been set up to use SSL).It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.
The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detaileddescriptions about the server side setup of the different JMS providers and also full working configuration examples onFlume Wiki.
SSL transport / server authentication:
If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s ownCA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done viathe global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.
Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg.ActiveMQ uses ssl:// URL prefix instead of tcp://).In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agentconfig file.
Client certificate authentication (two-way SSL):
JMS Source can authenticate to the JMS server through client certificate authentication instead of the usualuser/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).
The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parametersagain. For more details about the global SSL setup, see the SSL/TLS support section.
The keystore should contain only one key (if multiple keys are present, then the first one will be used).The key password must be the same as the keystore password.
In case of client certificate authentication, it is not needed to specify the userName / passwordFile propertiesfor the JMS Source in the Flume agent config file.
Please note:
There are no component level configuration parameters for JMS Source unlike in case of other components.No enable SSL flag either.SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absenceof the truststore / keystore.
Spooling Directory Source
This source lets you ingest data by placing files to be ingested into a“spooling” directory on disk.This source will watch the specified directory for new files, and will parseevents out of new files as they appear.The event parsing logic is pluggable.After a given file has been fully readinto the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is usedto keep track of processed files.
Unlike the Exec source, this source is reliable and will not miss data, even ifFlume is restarted or killed. In exchange for this reliability, only immutable,uniquely-named files must be dropped into the spooling directory. Flume triesto detect these problem conditions and will fail loudly if they are violated:
- If a file is written to after being placed into the spooling directory,Flume will print an error to its log file and stop processing.
- If a file name is reused at a later time, Flume will print an error to itslog file and stop processing.
To avoid the above issues, it may be useful to add a unique identifier(such as a timestamp) to log file names when they are moved into the spoolingdirectory.
Despite the reliability guarantees of this source, there are stillcases in which events may be duplicated if certain downstream failures occur.This is consistent with the guarantees offered by other Flume components.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | false | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
includePattern | ^.$ | Regular expression specifying which files to include.It can used together with ignorePattern.If a file matches both ignorePattern and includePattern regex,the file is ignored. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip).It can used together with includePattern.If a file matches both ignorePattern and includePattern regex,the file is ignored. |
trackerDir | .flumespool | Directory to store metadata related to processing of files.If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
trackingPolicy | rename | The tracking policy defines how file processing is tracked. It can be “rename” or“tracker_dir”. This parameter is only effective if the deletePolicy is “never”.“rename” - After processing files they get renamed according to the fileSuffix parameter.“tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir.The new tracker file name is derived from the ingested one plus the fileSuffix. |
consumeOrder | oldest | In which order files in the spooling directory will be consumed oldest,youngest and random. In case of oldest and youngest, the last modifiedtime of the files will be used to compare the files. In case of a tie, the filewith smallest lexicographical order will be consumed first. In case of random anyfile will be picked randomly. When using oldest and youngest the wholedirectory will be scanned to pick the oldest/youngest file, which might be slow if thereare a large number of files, while using random may cause old files to be consumedvery late if new files keep coming in the spooling directory. |
pollDelay | 500 | Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch | false | Whether to monitor sub directories for new files to read. |
maxBackoff | 4000 | The maximum time (in millis) to wait between consecutive attempts towrite to the channel(s) if the channel is full. The source will start ata low backoff and increase it exponentially each time the channel throws aChannelException, upto the value specified by this parameter. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL | What to do when we see a non-decodable character in the input file.FAIL: Throw an exception and fail to parse the file.REPLACE: Replace the unparseable character with the “replacement character” char,typically Unicode U+FFFD.IGNORE: Drop the unparseable character sequence. |
deserializer | LINE | Specify the deserializer used to parse the file into events.Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder. |
deserializer. | Varies per event deserializer. | |
bufferMaxLines | – | (Obselete) This option is now ignored. |
bufferMaxLineLength | 5000 | (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type | replicating | replicating or multiplexing |
selector. | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for an agent named agent-1:
- a1.channels = ch-1
- a1.sources = src-1
- a1.sources.src-1.type = spooldir
- a1.sources.src-1.channels = ch-1
- a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
- a1.sources.src-1.fileHeader = true
Event Deserializers
The following event deserializers ship with Flume.
LINE
This deserializer generates one event per line of text input.
Property Name | Default | Description |
---|---|---|
deserializer.maxLineLength | 2048 | Maximum number of characters to include in a single event.If a line exceeds this length, it is truncated, and theremaining characters on the line will appear in asubsequent event. |
deserializer.outputCharset | UTF-8 | Charset to use for encoding events put into the channel. |
AVRO
This deserializer is able to read an Avro container file, and it generatesone event per Avro record in the file.Each event is annotated with a header that indicates the schema used.The body of the event is the binary Avro record data, notincluding the schema or the rest of the container file elements.
Note that if the spool directory source must retry putting one of these eventsonto a channel (for example, because the channel is full), then it will resetand retry from the most recent Avro container file sync point. To reducepotential event duplication in such a failure scenario, write sync markers morefrequently in your Avro input files.
Property Name | Default | Description |
---|---|---|
deserializer.schemaType | HASH | How the schema is represented. By default, or when the value HASHis specified, the Avro schema is hashed andthe hash is stored in every event in the event header“flume.avro.schema.hash”. If LITERAL is specified, the JSON-encodedschema itself is stored in every event in the event header“flume.avro.schema.literal”. Using LITERAL mode is relativelyinefficient compared to HASH mode. |
BlobDeserializer
This deserializer reads a Binary Large Object (BLOB) per event, typically one BLOB per file. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because the entire BLOB is buffered in RAM.
Property Name | Default | Description |
---|---|---|
deserializer | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
Taildir Source
Note
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files.If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate.It periodically writes the last read position of each files on the given position file in JSON format.If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file.When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed.Currently this source does not support tailing binary files. It reads text files line by line.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be TAILDIR. |
filegroups | – | Space-separated list of file groups. Each file group indicates a set of files to be tailed. |
filegroups.<filegroupName> | – | Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. |
positionFile | ~/.flume/taildir_position.json | File in JSON format to record the inode, the absolute path and the last position of each tailing file. |
headers.<filegroupName>.<headerKey> | – | Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader | false | Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd | false | Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout | 120000 | Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. |
writePosInterval | 3000 | Interval time (ms) to write the last position of each file on the position file. |
batchSize | 100 | Max number of lines to read and send to the channel at a time. Using the default is usually fine. |
maxBatchCount | Long.MAX_VALUE | Controls the number of batches being read consecutively from the same file.If the source is tailing multiple files and one of them is written at a fast rate,it can prevent other files to be processed, because the busy file would be read in an endless loop.In this case lower this value. |
backoffSleepIncrement | 1000 | The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep | 5000 | The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching | true | Listing directories and applying the filename regex pattern may be time consuming for directoriescontaining thousands of files. Caching the list of matching files can improve performance.The order in which files are consumed will also be cached.Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.channels = c1
- a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
- a1.sources.r1.filegroups = f1 f2
- a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
- a1.sources.r1.headers.f1.headerKey1 = value1
- a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
- a1.sources.r1.headers.f2.headerKey1 = value2
- a1.sources.r1.headers.f2.headerKey2 = value2-2
- a1.sources.r1.fileHeader = true
- a1.sources.ri.maxBatchCount = 1000
Twitter 1% firehose Source (experimental)
Warning
This source is highly experimental and may change between minor versions of Flume.Use at your own risk.
Experimental source that connects via Streaming API to the 1% sample twitterfirehose, continously downloads tweets, converts them to Avro format andsends Avro events to a downstream Flume sink. Requires the consumer andaccess tokens and secrets of a Twitter developer account.Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | Maximum number of twitter messages to put in a single batch |
maxBatchDurationMillis | 1000 | Maximum number of milliseconds to wait before closing a batch |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
- a1.sources.r1.channels = c1
- a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
- a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
- a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
- a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
- a1.sources.r1.maxBatchSize = 10
- a1.sources.r1.maxBatchDurationMillis = 200
Kafka Source
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.If you have multiple Kafka sources running, you can configure them with the same Consumer Groupso each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the source |
kafka.consumer.group.id | flume | Unique identified of consumer group. Setting the same id in multiple sources or agentsindicates that they are part of the same consumer group |
kafka.topics | – | Comma-separated list of topics the kafka consumer will read messages from. |
kafka.topics.regex | – | Regex that defines set of topics the source is subscribed on. This property has higher prioritythan kafka.topics and overrides kafka.topics if exists. |
batchSize | 1000 | Maximum number of messages written to Channel in one batch |
batchDurationMillis | 1000 | Maximum time (in ms) before a batch will be written to ChannelThe batch will be written whenever the first of size and time will be reached. |
backoffSleepIncrement | 1000 | Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty.Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal foringestion use cases but a lower value may be required for low latency operations withinterceptors. |
maxBackoffSleep | 5000 | Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds isideal for ingestion use cases but a lower value may be required for low latency operationswith interceptors. |
useFlumeEventFormat | false | By default events are taken as bytes from the Kafka topic directly into the event body. Set totrue to read events as the Flume Avro binary format. Used in conjunction with the same propertyon the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserveany Flume headers sent on the producing side. |
setTopicHeader | true | When set to true, stores the topic of the retrieved message into a header, defined by thetopicHeader property. |
topicHeader | topic | Defines the name of the header in which to store the name of the topic the message was receivedfrom, if the setTopicHeader property is set to true. Care should be taken if combiningwith the Kafka Sink topicHeader property so as to avoid sending the message back to the sametopic in a loop. |
kafka.consumer.security.protocol | PLAINTEXT | Set to SASLPLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
_more consumer security props | If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additionalproperties that need to be set on consumer. | |
Other Kafka Consumer Properties | – | These properties are used to configure the Kafka Consumer. Any consumer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer.For example: kafka.consumer.auto.offset.reset |
Note
The Kafka Source overrides two Kafka consumer parameters:auto.commit.enable is set to “false” by the source and every batch is committed. Kafka source guarantees at least oncestrategy of messages retrieval. The duplicates can be present when the source starts.The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
Deprecated Properties
Property Name | Default | Description |
---|---|---|
topic | – | Use kafka.topics |
groupId | flume | Use kafka.consumer.group.id |
zookeeperConnect | – | Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.serversto establish connection with kafka cluster |
migrateZookeeperOffsets | true | When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.This should be true to support seamless Kafka client migration from older versions of Flume.Once migrated this can be set to false, though that should generally not be required.If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.resetdefines how offsets are handled.Check Kafka documentationfor details |
Example for topic subscription by comma-separated topic list.
- tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- tier1.sources.source1.channels = channel1
- tier1.sources.source1.batchSize = 5000
- tier1.sources.source1.batchDurationMillis = 2000
- tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
- tier1.sources.source1.kafka.topics = test1, test2
- tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex
- tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- tier1.sources.source1.channels = channel1
- tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
- tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
- # the default kafka.consumer.group.id=flume is used
Security and Kafka Source:
Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka.For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.
As of now data encryption is solely provided by SSL/TLS.
Setting kafka.consumer.security.protocol to any of the following value means:
- SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
- SASL_SSL - Kerberos or plaintext authentication with data encryption
- SSL - TLS based encryption with optional authentication.
Warning
There is a performance degradation when SSL is enabled,the magnitude of which depends on the CPU type and the JVM implementation.Reference: Kafka security overviewand the jira for tracking this issue:KAFKA-2561
TLS and Kafka Source:
Please read the steps described in Configuring Kafka Clients SSLto learn about additional configuration settings for fine tuning for example any of the following:security provider, cipher suites, enabled protocols, truststore or keystore types.
Example configuration with server side authentication and data encryption.
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SSL
- # optional, the global truststore can be used alternatively
- a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
- a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
Specyfing the truststore is optional here, the global truststore can be used instead.For more details about the global SSL setup, see the SSL/TLS support section.
Note: By default the property ssl.endpoint.identification.algorithmis not defined, so hostname verification is not performed.In order to enable hostname verification, set the following properties
- a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
Once enabled, clients will verify the server’s fully qualified domain name (FQDN)against one of the following two fields:
- Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
- Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
If client side authentication is also required then additionally the following needs to be added to Flume agentconfiguration or the global SSL setup can be used (see SSL/TLS support section).Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers eitherindividually or by their signature chain. Common example is to sign each client certificate by a single Root CAwhich in turn is trusted by Kafka brokers.
- # optional, the global keystore can be used alternatively
- a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
- a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
If keystore and key use different password protection then ssl.key.password property willprovide the required additional secret for both consumer keystores:
- a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer.The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed.See Kafka docfor information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:
- JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
- JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
- a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
- a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
- a1.sources.source1.kafka.topics = mytopic
- a1.sources.source1.kafka.consumer.group.id = flume-consumer
- a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
- a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
- a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
- # optional, the global truststore can be used alternatively
- a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
- a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN)in Kafka documentation of SASL configuration.Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example.This won’t be needed unless you require offset migration, or you require this section for other secure components.Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
- KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/path/to/keytabs/flume.keytab"
- principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
- };
NetCat TCP Source
A netcat-like source that listens on a given port and turns each line of textinto an event. Acts like nc -k -l [host] [port]. In other words,it opens a specified port and listens for data. The expectation is that thesupplied data is newline separated text. Each line of text is turned into aFlume event and sent via the connected channel.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | true | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector. | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
- a1.sources.r1.channels = c1
NetCat UDP Source
As per the original Netcat (TCP) source, this source that listens on a givenport and turns each line of text into an event and sent via the connected channel.Acts like nc -u -k -l [host] [port].
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcatudp |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
remoteAddressHeader | – | |
selector.type | replicating | replicating or multiplexing |
selector. | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = netcatudp
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 6666
- a1.sources.r1.channels = c1
Sequence Generator Source
A simple sequence generator that continuously generates events with a counter that starts from 0,increments by 1 and stops at totalEvents. Retries when it can’t send events to the channel. Usefulmainly for testing. During retries it keeps the body of the retried messages the same as before sothat the number of unique events - after de-duplication at destination - is expected to beequal to the specified totalEvents. Required properties are in bold.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be seq |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. | ||
batchSize | 1 | Number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | Number of unique events sent by the source. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = seq
- a1.sources.r1.channels = c1
Syslog Sources
Reads syslog data and generate Flume events. The UDP source treats an entiremessage as a single event. The TCP sources create a new event for each stringof characters separated by a newline (‘n’).
Required properties are in bold.
Syslog TCP Source
The original, tried-and-true syslog TCP source.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
eventSize | 2500 | Maximum size of a single event line, in bytes |
keepFields | none | Setting this to ‘all’ will preserve the Priority,Timestamp and Hostname in the body of the event.A spaced separated list of fields to includeis allowed as well. Currently, the followingfields can be included: priority, version,timestamp, hostname. The values ‘true’ and ‘false’have been deprecated in favor of ‘all’ and ‘none’. |
clientIPHeader | – | If specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a syslog TCP source for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = syslogtcp
- a1.sources.r1.port = 5140
- a1.sources.r1.host = localhost
- a1.sources.r1.channels = c1
Multiport Syslog TCP Source
This is a newer, faster, multi-port capable version of the Syslog TCP source.Note that the ports configuration setting has replaced port.Multi-port capability means that it can listen on many ports at once in anefficient manner. This source uses the Apache Mina library to do that.Provides support for RFC-3164 and many common RFC-5424 formatted messages.Also provides the capability to configure the character set used on a per-portbasis.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be multiportsyslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
eventSize | 2500 | Maximum size of a single event line, in bytes. |
keepFields | none | Setting this to ‘all’ will preserve thePriority, Timestamp and Hostname in the body of the event.A spaced separated list of fields to includeis allowed as well. Currently, the followingfields can be included: priority, version,timestamp, hostname. The values ‘true’ and ‘false’have been deprecated in favor of ‘all’ and ‘none’. |
portHeader | – | If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
clientIPHeader | – | If specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like _host)because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case. |
charset.default | UTF-8 | Default character set used while parsing syslog events into strings. |
charset.port.<port> | – | Character set is configurable on a per-port basis. |
batchSize | 100 | Maximum number of events to attempt to process per request loop. Using the default is usually fine. |
readBufferSize | 1024 | Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine. |
numProcessors | (auto-detected) | Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable. |
selector.type | replicating | replicating, multiplexing, or custom |
selector. | – | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors. |
interceptors. | ||
ssl | false | Set this to true to enable SSL encryption. If SSL is enabled,you must also specify a “keystore” and a “keystore-password”,either through component level parameters (see below)or as global SSL parameters (see SSL/TLS support section). |
keystore | – | This is the path to a Java keystore file.If not specified here, then the global keystore will be used(if defined, otherwise configuration error). |
keystore-password | – | The password for the Java keystore.If not specified here, then the global keystore password will be used(if defined, otherwise configuration error). |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”.If not specified here, then the global keystore type will be used(if defined, otherwise the default is JKS). |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites.If included-cipher-suites is empty, it includes every supported cipher suites. |
For example, a multiport syslog TCP source for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = multiport_syslogtcp
- a1.sources.r1.channels = c1
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.ports = 10001 10002 10003
- a1.sources.r1.portHeader = port
Syslog UDP Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogudp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
keepFields | false | Setting this to true will preserve the Priority,Timestamp and Hostname in the body of the event. |
clientIPHeader | – | If specified, the IP address of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the IP address of the client.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case. |
clientHostnameHeader | – | If specified, the host name of the client will be stored inthe header of each event using the header name specified here.This allows for interceptors and channel selectors to customizerouting logic based on the host name of the client.Retrieving the host name may involve a name service reverselookup which may affect the performance.Do not use the standard Syslog header names here (like host)because the event header will be overridden in that case. |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. |
For example, a syslog UDP source for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = syslogudp
- a1.sources.r1.port = 5140
- a1.sources.r1.host = localhost
- a1.sources.r1.channels = c1
HTTP Source
A source which accepts Flume Events by HTTP POST and GET. GET should be usedfor experimentation only. HTTP requests are converted into flume events bya pluggable “handler” which must implement the HTTPSourceHandler interface.This handler takes a HttpServletRequest and returns a list offlume events. All events handled from one Http request are committed to the channelin one transaction, thus allowing for increased efficiency on channels likethe file channel. If the handler throws an exception, this source willreturn a HTTP status of 400. If the channel is full, or the source is unable toappend events to the channel, the source will return a HTTP 503 - Temporarilyunavailable status.
All events sent in one post request are considered to be one batch andinserted into the channel in one transaction.
This source is based on Jetty 9.4 and offers the ability to set additionalJetty-specific parameters which will be passed directly to the Jetty components.
Property Name | Default | Description |
---|---|---|
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
handler. | – | Config parameters for the handler |
selector.type | replicating | replicating or multiplexing |
selector. | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors. | ||
ssl | false | Set the property true, to enable SSL. HTTP Source does not support SSLv3. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude.SSLv3 will always be excluded in addition to the protocols specified. |
include-protocols | – | Space-separated list of SSL/TLS protocols to include.The enabled protocols will be the included protocols without the excluded protocols.If included-protocols is empty, it includes every supported protocols. |
exclude-cipher-suites | – | Space-separated list of cipher suites to exclude. |
include-cipher-suites | – | Space-separated list of cipher suites to include.The enabled cipher suites will be the included cipher suites without the excluded cipher suites. |
keystore | Location of the keystore including keystore file name.If SSL is enabled but the keystore is not specified here,then the global keystore will be used(if defined, otherwise configuration error). | |
keystore-password | Keystore password.If SSL is enabled but the keystore password is not specified here,then the global keystore password will be used(if defined, otherwise configuration error). | |
keystore-type | JKS | Keystore type. This can be “JKS” or “PKCS12”. |
QueuedThreadPool. | Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool.N.B. QueuedThreadPool will only be used if at least one property of this class is set. | |
HttpConfiguration. | Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration | |
SslContextFactory. | Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (onlyapplicable when ssl is set to true). | |
ServerConnector.* | Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector |
Deprecated Properties
Property Name | Default | Description |
---|---|---|
keystorePassword | – | Use keystore-password. Deprecated value will be overwritten with the new one. |
excludeProtocols | SSLv3 | Use exclude-protocols. Deprecated value will be overwritten with the new one. |
enableSSL | false | Use ssl. Deprecated value will be overwritten with the new one. |
N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes(QueuedThreadPool,HttpConfiguration,SslContextFactory andServerConnector).
When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will takeprecedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.
An example http source for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = http
- a1.sources.r1.port = 5140
- a1.sources.r1.channels = c1
- a1.sources.r1.handler = org.example.rest.RestHandler
- a1.sources.r1.handler.nickname = random props
- a1.sources.r1.HttpConfiguration.sendServerVersion = false
- a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler
A handler is provided out of the box which can handle events represented inJSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handleraccepts an array of events (even if there is only one event, the event has to besent in an array) and converts them to a Flume event based on theencoding specified in the request. If no encoding is specified, UTF-8 is assumed.The JSON handler supports UTF-8, UTF-16 and UTF-32.Events are represented as follows.
- [{
- "headers" : {
- "timestamp" : "434324343",
- "host" : "random_host.example.com"
- },
- "body" : "random_body"
- },
- {
- "headers" : {
- "namenode" : "namenode.example.com",
- "datanode" : "random_datanode.example.com"
- },
- "body" : "really_random_body"
- }]
To set the charset, the request must have content type specified asapplication/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 asrequired).
One way to create an event in the format expected by this handler is touse JSONEvent provided in the Flume SDK and use Google Gson to create the JSONstring using the Gson#fromJson(Object, Type)method. The type token to pass as the 2nd argument of this methodfor list of events can be created by:
- Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler
By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.
Property Name | Default | Description |
---|---|---|
handler | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
Stress Source
StressSource is an internal load-generating source implementation which is very useful forstress tests. It allows User to configure the size of Event payload, with empty headers.User can configure total number of events to be sent as well maximum number of SuccessfulEvent to be delivered.
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.StressSource |
size | 500 | Payload size of each Event. Unit:byte |
maxTotalEvents | -1 | Maximum number of Events to be sent |
maxSuccessfulEvents | -1 | Maximum number of Events successfully sent |
batchSize | 1 | Number of Events to be sent in one batch |
maxEventsPerSecond | 0 | When set to an integer greater than zero, enforces a rate limiter onto the source. |
Example for agent named a1:
- a1.sources = stresssource-1
- a1.channels = memoryChannel-1
- a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
- a1.sources.stresssource-1.size = 10240
- a1.sources.stresssource-1.maxTotalEvents = 1000000
- a1.sources.stresssource-1.channels = memoryChannel-1
Legacy Sources
The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume1.0 format, and stores them in the connected channel. The 0.9.4 eventproperties like timestamp, pri, host, nanos, etc get converted to 1.x eventheader attributes. The legacy source supports both Avro and Thrift RPCconnections. To use this bridge between two Flume versions, you need to start aFlume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agentshould have the agent Sink pointing to the host/port of the 1.x agent.
Note
The reliability semantics of Flume 1.x are different from that ofFlume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not besupported by the legacy source. The only supported 0.9.x mode is thebest effort, though the reliability setting of the 1.x flow will beapplicable to the events once they are saved into the Flume 1.xchannel by the legacy source.
Required properties are in bold.
Avro Legacy Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.bind = 6666
- a1.sources.r1.channels = c1
Thrift Legacy Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
- a1.sources.r1.host = 0.0.0.0
- a1.sources.r1.bind = 6666
- a1.sources.r1.channels = c1
Custom Source
A custom source is your own implementation of the Source interface. A customsource’s class and its dependencies must be included in the agent’s classpathwhen starting the Flume agent. The type of the custom source is its FQCN.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be your FQCN |
selector.type | replicating or multiplexing | |
selector. | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors. |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.example.MySource
- a1.sources.r1.channels = c1
Scribe Source
Scribe is another type of ingest system. To adopt existing Scribe ingest system,Flume should use ScribeSource based on Thrift with compatible transfering protocol.For deployment of Scribe please follow the guide from Facebook.Required properties are in bold.
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
selector.type | ||
selector.* |
Example for agent named a1:
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
- a1.sources.r1.port = 1463
- a1.sources.r1.workerThreads = 5
- a1.sources.r1.channels = c1