Client
The client operates at the point of origin of events and delivers them to aFlume agent. Clients typically operate in the process space of the applicationthey are consuming data from. Flume currently supports Avro, log4j, syslog,and Http POST (with a JSON body) as ways to transfer data from a externalsource. Additionally, there’s an ExecSource that can consume the output of alocal process as input to Flume.
It’s quite possible to have a use case where these existing options are notsufficient. In this case you can build a custom mechanism to send data toFlume. There are two ways of achieving this. The first option is to create acustom client that communicates with one of Flume’s existing Sources likeAvroSource or SyslogTcpSource. Here the client should convert its datainto messages understood by these Flume Sources. The other option is towrite a custom Flume Source that directly talks with your existing clientapplication using some IPC or RPC protocol, and then converts the client datainto Flume Events to be sent downstream. Note that all events storedwithin the Channel of a Flume agent must exist as Flume Events.
Client SDK
Though Flume contains a number of built-in mechanisms (i.e. Sources) toingest data, often one wants the ability to communicate with Flume directly froma custom application. The Flume Client SDK is a library that enablesapplications to connect to Flume and send data into Flume’s data flow over RPC.
RPC client interface
An implementation of Flume’s RpcClient interface encapsulates the RPC mechanismsupported by Flume. The user’s application can simply call the Flume ClientSDK’s append(Event) or appendBatch(List<Event>) to send data and notworry about the underlying message exchange details. The user can provide therequired Event arg by either directly implementing the Event interface,by using a convenience implementation such as the SimpleEvent class, or by usingEventBuilder‘s overloaded withBody() static helper methods.
RPC clients - Avro and Thrift
As of Flume 1.4.0, Avro is the default RPC protocol. TheNettyAvroRpcClient and ThriftRpcClient implement the RpcClientinterface. The client needs to create this object with the host and port ofthe target Flume agent, and can then use the RpcClient to send data intothe agent. The following example shows how to use the Flume Client SDK APIwithin a user’s data-generating application:
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.api.RpcClient;
- import org.apache.flume.api.RpcClientFactory;
- import org.apache.flume.event.EventBuilder;
- import java.nio.charset.Charset;
- public class MyApp {
- public static void main(String[] args) {
- MyRpcClientFacade client = new MyRpcClientFacade();
- // Initialize client with the remote Flume agent's host and port
- client.init("host.example.org", 41414);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello Flume!";
- for (int i = 0; i < 10; i++) {
- client.sendDataToFlume(sampleData);
- }
- client.cleanUp();
- }
- }
- class MyRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of the above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of the above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
The remote Flume agent needs to have an AvroSource (or aThriftSource if you are using a Thrift client) listening on some port.Below is an example Flume agent configuration that’s waiting for a connectionfrom MyApp:
- a1.channels = c1
- a1.sources = r1
- a1.sinks = k1
- a1.channels.c1.type = memory
- a1.sources.r1.channels = c1
- a1.sources.r1.type = avro
- # For using a thrift source set the following instead of the above line.
- # a1.source.r1.type = thrift
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 41414
- a1.sinks.k1.channel = c1
- a1.sinks.k1.type = logger
For more flexibility, the default Flume client implementations(NettyAvroRpcClient and ThriftRpcClient) can be configured with theseproperties:
- client.type = default (for avro) or thrift (for thrift)
- hosts = h1 # default client accepts only 1 host
- # (additional hosts will be ignored)
- hosts.h1 = host1.example.org:41414 # host and port must both be specified
- # (neither has a default)
- batch-size = 100 # Must be >=1 (default: 100)
- connect-timeout = 20000 # Must be >=1000 (default: 20000)
- request-timeout = 20000 # Must be >=1000 (default: 20000)
Secure RPC client - Thrift
As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.The client needs to use the getThriftInstance method of SecureRpcClientFactoryto get hold of a SecureThriftRpcClient. SecureThriftRpcClient extendsThriftRpcClient which implements the RpcClient interface. The kerberosauthentication module resides in flume-ng-auth module which isrequired in classpath, when using the SecureRpcClientFactory. Both the clientprincipal and the client keytab should be passed in as parameters through theproperties and they reflect the credentials of the client to authenticateagainst the kerberos KDC. In addition, the server principal of the destinationThrift source to which this client is connecting to, should also be provided.The following example shows how to use the SecureRpcClientFactorywithin a user’s data-generating application:
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.event.EventBuilder;
- import org.apache.flume.api.SecureRpcClientFactory;
- import org.apache.flume.api.RpcClientConfigurationConstants;
- import org.apache.flume.api.RpcClient;
- import java.nio.charset.Charset;
- import java.util.Properties;
- public class MyApp {
- public static void main(String[] args) {
- MySecureRpcClientFacade client = new MySecureRpcClientFacade();
- // Initialize client with the remote Flume agent's host, port
- Properties props = new Properties();
- props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
- props.setProperty("hosts", "h1");
- props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
- // Initialize client with the kerberos authentication related properties
- props.setProperty("kerberos", "true");
- props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
- props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
- props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
- client.init(props);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello Flume!";
- for (int i = 0; i < 10; i++) {
- client.sendDataToFlume(sampleData);
- }
- client.cleanUp();
- }
- }
- class MySecureRpcClientFacade {
- private RpcClient client;
- private Properties properties;
- public void init(Properties properties) {
- // Setup the RPC connection
- this.properties = properties;
- // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
- this.client = SecureRpcClientFactory.getThriftInstance(properties);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = SecureRpcClientFactory.getThriftInstance(properties);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
The remote ThriftSource should be started in kerberos mode.Below is an example Flume agent configuration that’s waiting for a connectionfrom MyApp:
- a1.channels = c1
- a1.sources = r1
- a1.sinks = k1
- a1.channels.c1.type = memory
- a1.sources.r1.channels = c1
- a1.sources.r1.type = thrift
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 41414
- a1.sources.r1.kerberos = true
- a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
- a1.sources.r1.agent-keytab = /tmp/flume.keytab
- a1.sinks.k1.channel = c1
- a1.sinks.k1.type = logger
Failover Client
This class wraps the default Avro RPC client to provide failover handlingcapability to clients. This takes a whitespace-separated list of <host>:<port>representing the Flume agents that make-up a failover group. The Failover RPCClient currently does not support thrift. If there’s acommunication error with the currently selected host (i.e. agent) agent,then the failover client automatically fails-over to the next host in the list.For example:
- // Setup properties for the failover
- Properties props = new Properties();
- props.put("client.type", "default_failover");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3", host3);
- // create the client with failover properties
- RpcClient client = RpcClientFactory.getInstance(props);
For more flexibility, the failover Flume client implementation(FailoverRpcClient) can be configured with these properties:
- client.type = default_failover
- hosts = h1 h2 h3 # at least one is required, but 2 or
- # more makes better sense
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- max-attempts = 3 # Must be >=0 (default: number of hosts
- # specified, 3 in this case). A '0'
- # value doesn't make much sense because
- # it will just cause an append call to
- # immmediately fail. A '1' value means
- # that the failover client will try only
- # once to send the Event, and if it
- # fails then there will be no failover
- # to a second client, so this value
- # causes the failover client to
- # degenerate into just a default client.
- # It makes sense to set this value to at
- # least the number of hosts that you
- # specified.
- batch-size = 100 # Must be >=1 (default: 100)
- connect-timeout = 20000 # Must be >=1000 (default: 20000)
- request-timeout = 20000 # Must be >=1000 (default: 20000)
LoadBalancing RPC client
The Flume Client SDK also supports an RpcClient which load-balances amongmultiple hosts. This type of client takes a whitespace-separated list of<host>:<port> representing the Flume agents that make-up a load-balancing group.This client can be configured with a load balancing strategy that eitherrandomly selects one of the configured hosts, or selects a host in a round-robinfashion. You can also specify your own custom class that implements theLoadBalancingRpcClient$HostSelector interface so that a custom selectionorder is used. In that case, the FQCN of the custom class needs to be specifiedas the value of the host-selector property. The LoadBalancing RPC Clientcurrently does not support thrift.
If backoff is enabled then the client will temporarily blacklisthosts that fail, causing them to be excluded from being selected as a failoverhost until a given timeout. When the timeout elapses, if the host is stillunresponsive then this is considered a sequential failure, and the timeout isincreased exponentially to avoid potentially getting stuck in long waits onunresponsive hosts.
The maximum backoff time can be configured by setting maxBackoff (inmilliseconds). The maxBackoff default is 30 seconds (specified in theOrderSelector class that’s the superclass of both load balancingstrategies). The backoff timeout will increase exponentially with eachsequential failure up to the maximum possible backoff timeout.The maximum possible backoff is limited to 65536 seconds (about 18.2 hours).For example:
- // Setup properties for the load balancing
- Properties props = new Properties();
- props.put("client.type", "default_loadbalance");
- // List of hosts (space-separated list of user-chosen host aliases)
- props.put("hosts", "h1 h2 h3");
- // host/port pair for each host alias
- String host1 = "host1.example.org:41414";
- String host2 = "host2.example.org:41414";
- String host3 = "host3.example.org:41414";
- props.put("hosts.h1", host1);
- props.put("hosts.h2", host2);
- props.put("hosts.h3", host3);
- props.put("host-selector", "random"); // For random host selection
- // props.put("host-selector", "round_robin"); // For round-robin host
- // // selection
- props.put("backoff", "true"); // Disabled by default.
- props.put("maxBackoff", "10000"); // Defaults 0, which effectively
- // becomes 30000 ms
- // Create the client with load balancing properties
- RpcClient client = RpcClientFactory.getInstance(props);
For more flexibility, the load-balancing Flume client implementation(LoadBalancingRpcClient) can be configured with these properties:
- client.type = default_loadbalance
- hosts = h1 h2 h3 # At least 2 hosts are required
- hosts.h1 = host1.example.org:41414
- hosts.h2 = host2.example.org:41414
- hosts.h3 = host3.example.org:41414
- backoff = false # Specifies whether the client should
- # back-off from (i.e. temporarily
- # blacklist) a failed host
- # (default: false).
- maxBackoff = 0 # Max timeout in millis that a will
- # remain inactive due to a previous
- # failure with that host (default: 0,
- # which effectively becomes 30000)
- host-selector = round_robin # The host selection strategy used
- # when load-balancing among hosts
- # (default: round_robin).
- # Other values are include "random"
- # or the FQCN of a custom class
- # that implements
- # LoadBalancingRpcClient$HostSelector
- batch-size = 100 # Must be >=1 (default: 100)
- connect-timeout = 20000 # Must be >=1000 (default: 20000)
- request-timeout = 20000 # Must be >=1000 (default: 20000)