Embedded agent

Flume has an embedded agent api which allows users to embed an agent in theirapplication. This agent is meant to be lightweight and as such not allsources, sinks, and channels are allowed. Specifically the source usedis a special embedded source and events should be send to the sourcevia the put, putAll methods on the EmbeddedAgent object. Only File Channeland Memory Channel are allowed as channels while Avro Sink is the onlysupported sink. Interceptors are also supported by the embedded agent.

Note: The embedded agent has a dependency on hadoop-core.jar.

Configuration of an Embedded Agent is similar to configuration of afull Agent. The following is an exhaustive list of configration options:

Required properties are in bold.

Property NameDefaultDescription
source.typeembeddedThe only available source is the embedded source.
channel.typeEither memory or file which correspondto MemoryChannel and FileChannel respectively.
channel.Configuration options for the channel type requested,see MemoryChannel or FileChannel user guide for an exhaustive list.
sinksList of sink names
sink.typeProperty name must match a name in the list of sinks.Value must be avro
sink.Configuration options for the sink.See AvroSink user guide for an exhaustive list,however note AvroSink requires at least hostname and port.
processor.typeEither failover or load_balance which correspondto FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
processor.Configuration options for the sink processor selected.See FailoverSinksProcessor and LoadBalancingSinkProcessoruser guide for an exhaustive list.
source.interceptorsSpace-separated list of interceptors
source.interceptors.Configuration options for individual interceptorsspecified in the source.interceptors property

Below is an example of how to use the agent:

  1. Map<String, String> properties = new HashMap<String, String>();
  2. properties.put("channel.type", "memory");
  3. properties.put("channel.capacity", "200");
  4. properties.put("sinks", "sink1 sink2");
  5. properties.put("sink1.type", "avro");
  6. properties.put("sink2.type", "avro");
  7. properties.put("sink1.hostname", "collector1.apache.org");
  8. properties.put("sink1.port", "5564");
  9. properties.put("sink2.hostname", "collector2.apache.org");
  10. properties.put("sink2.port", "5565");
  11. properties.put("processor.type", "load_balance");
  12. properties.put("source.interceptors", "i1");
  13. properties.put("source.interceptors.i1.type", "static");
  14. properties.put("source.interceptors.i1.key", "key1");
  15. properties.put("source.interceptors.i1.value", "value1");
  16.  
  17. EmbeddedAgent agent = new EmbeddedAgent("myagent");
  18.  
  19. agent.configure(properties);
  20. agent.start();
  21.  
  22. List<Event> events = Lists.newArrayList();
  23.  
  24. events.add(event);
  25. events.add(event);
  26. events.add(event);
  27. events.add(event);
  28.  
  29. agent.putAll(events);
  30.  
  31. ...
  32.  
  33. agent.stop();