Embedded agent
Flume has an embedded agent api which allows users to embed an agent in theirapplication. This agent is meant to be lightweight and as such not allsources, sinks, and channels are allowed. Specifically the source usedis a special embedded source and events should be send to the sourcevia the put, putAll methods on the EmbeddedAgent object. Only File Channeland Memory Channel are allowed as channels while Avro Sink is the onlysupported sink. Interceptors are also supported by the embedded agent.
Note: The embedded agent has a dependency on hadoop-core.jar.
Configuration of an Embedded Agent is similar to configuration of afull Agent. The following is an exhaustive list of configration options:
Required properties are in bold.
Property Name | Default | Description |
---|---|---|
source.type | embedded | The only available source is the embedded source. |
channel.type | – | Either memory or file which correspondto MemoryChannel and FileChannel respectively. |
channel. | – | Configuration options for the channel type requested,see MemoryChannel or FileChannel user guide for an exhaustive list. |
sinks | – | List of sink names |
sink.type | – | Property name must match a name in the list of sinks.Value must be avro |
sink. | – | Configuration options for the sink.See AvroSink user guide for an exhaustive list,however note AvroSink requires at least hostname and port. |
processor.type | – | Either failover or load_balance which correspondto FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. |
processor. | – | Configuration options for the sink processor selected.See FailoverSinksProcessor and LoadBalancingSinkProcessoruser guide for an exhaustive list. |
source.interceptors | – | Space-separated list of interceptors |
source.interceptors. | – | Configuration options for individual interceptorsspecified in the source.interceptors property |
Below is an example of how to use the agent:
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("channel.type", "memory");
- properties.put("channel.capacity", "200");
- properties.put("sinks", "sink1 sink2");
- properties.put("sink1.type", "avro");
- properties.put("sink2.type", "avro");
- properties.put("sink1.hostname", "collector1.apache.org");
- properties.put("sink1.port", "5564");
- properties.put("sink2.hostname", "collector2.apache.org");
- properties.put("sink2.port", "5565");
- properties.put("processor.type", "load_balance");
- properties.put("source.interceptors", "i1");
- properties.put("source.interceptors.i1.type", "static");
- properties.put("source.interceptors.i1.key", "key1");
- properties.put("source.interceptors.i1.value", "value1");
- EmbeddedAgent agent = new EmbeddedAgent("myagent");
- agent.configure(properties);
- agent.start();
- List<Event> events = Lists.newArrayList();
- events.add(event);
- events.add(event);
- events.add(event);
- events.add(event);
- agent.putAll(events);
- ...
- agent.stop();