Client

The client operates at the point of origin of events and delivers them to aFlume agent. Clients typically operate in the process space of the applicationthey are consuming data from. Flume currently supports Avro, log4j, syslog,and Http POST (with a JSON body) as ways to transfer data from a externalsource. Additionally, there’s an ExecSource that can consume the output of alocal process as input to Flume.

It’s quite possible to have a use case where these existing options are notsufficient. In this case you can build a custom mechanism to send data toFlume. There are two ways of achieving this. The first option is to create acustom client that communicates with one of Flume’s existing Sources likeAvroSource or SyslogTcpSource. Here the client should convert its datainto messages understood by these Flume Sources. The other option is towrite a custom Flume Source that directly talks with your existing clientapplication using some IPC or RPC protocol, and then converts the client datainto Flume Events to be sent downstream. Note that all events storedwithin the Channel of a Flume agent must exist as Flume Events.

Client SDK

Though Flume contains a number of built-in mechanisms (i.e. Sources) toingest data, often one wants the ability to communicate with Flume directly froma custom application. The Flume Client SDK is a library that enablesapplications to connect to Flume and send data into Flume’s data flow over RPC.

RPC client interface

An implementation of Flume’s RpcClient interface encapsulates the RPC mechanismsupported by Flume. The user’s application can simply call the Flume ClientSDK’s append(Event) or appendBatch(List<Event>) to send data and notworry about the underlying message exchange details. The user can provide therequired Event arg by either directly implementing the Event interface,by using a convenience implementation such as the SimpleEvent class, or by usingEventBuilder‘s overloaded withBody() static helper methods.

RPC clients - Avro and Thrift

As of Flume 1.4.0, Avro is the default RPC protocol. TheNettyAvroRpcClient and ThriftRpcClient implement the RpcClientinterface. The client needs to create this object with the host and port ofthe target Flume agent, and can then use the RpcClient to send data intothe agent. The following example shows how to use the Flume Client SDK APIwithin a user’s data-generating application:

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.api.RpcClient;
  4. import org.apache.flume.api.RpcClientFactory;
  5. import org.apache.flume.event.EventBuilder;
  6. import java.nio.charset.Charset;
  7.  
  8. public class MyApp {
  9. public static void main(String[] args) {
  10. MyRpcClientFacade client = new MyRpcClientFacade();
  11. // Initialize client with the remote Flume agent's host and port
  12. client.init("host.example.org", 41414);
  13.  
  14. // Send 10 events to the remote Flume agent. That agent should be
  15. // configured to listen with an AvroSource.
  16. String sampleData = "Hello Flume!";
  17. for (int i = 0; i < 10; i++) {
  18. client.sendDataToFlume(sampleData);
  19. }
  20.  
  21. client.cleanUp();
  22. }
  23. }
  24.  
  25. class MyRpcClientFacade {
  26. private RpcClient client;
  27. private String hostname;
  28. private int port;
  29.  
  30. public void init(String hostname, int port) {
  31. // Setup the RPC connection
  32. this.hostname = hostname;
  33. this.port = port;
  34. this.client = RpcClientFactory.getDefaultInstance(hostname, port);
  35. // Use the following method to create a thrift client (instead of the above line):
  36. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  37. }
  38.  
  39. public void sendDataToFlume(String data) {
  40. // Create a Flume Event object that encapsulates the sample data
  41. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  42.  
  43. // Send the event
  44. try {
  45. client.append(event);
  46. } catch (EventDeliveryException e) {
  47. // clean up and recreate the client
  48. client.close();
  49. client = null;
  50. client = RpcClientFactory.getDefaultInstance(hostname, port);
  51. // Use the following method to create a thrift client (instead of the above line):
  52. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  53. }
  54. }
  55.  
  56. public void cleanUp() {
  57. // Close the RPC connection
  58. client.close();
  59. }
  60.  
  61. }

The remote Flume agent needs to have an AvroSource (or aThriftSource if you are using a Thrift client) listening on some port.Below is an example Flume agent configuration that’s waiting for a connectionfrom MyApp:

  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k1
  4.  
  5. a1.channels.c1.type = memory
  6.  
  7. a1.sources.r1.channels = c1
  8. a1.sources.r1.type = avro
  9. # For using a thrift source set the following instead of the above line.
  10. # a1.source.r1.type = thrift
  11. a1.sources.r1.bind = 0.0.0.0
  12. a1.sources.r1.port = 41414
  13.  
  14. a1.sinks.k1.channel = c1
  15. a1.sinks.k1.type = logger

For more flexibility, the default Flume client implementations(NettyAvroRpcClient and ThriftRpcClient) can be configured with theseproperties:

  1. client.type = default (for avro) or thrift (for thrift)
  2.  
  3. hosts = h1 # default client accepts only 1 host
  4. # (additional hosts will be ignored)
  5.  
  6. hosts.h1 = host1.example.org:41414 # host and port must both be specified
  7. # (neither has a default)
  8.  
  9. batch-size = 100 # Must be >=1 (default: 100)
  10.  
  11. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  12.  
  13. request-timeout = 20000 # Must be >=1000 (default: 20000)

Secure RPC client - Thrift

As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication.The client needs to use the getThriftInstance method of SecureRpcClientFactoryto get hold of a SecureThriftRpcClient. SecureThriftRpcClient extendsThriftRpcClient which implements the RpcClient interface. The kerberosauthentication module resides in flume-ng-auth module which isrequired in classpath, when using the SecureRpcClientFactory. Both the clientprincipal and the client keytab should be passed in as parameters through theproperties and they reflect the credentials of the client to authenticateagainst the kerberos KDC. In addition, the server principal of the destinationThrift source to which this client is connecting to, should also be provided.The following example shows how to use the SecureRpcClientFactorywithin a user’s data-generating application:

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.event.EventBuilder;
  4. import org.apache.flume.api.SecureRpcClientFactory;
  5. import org.apache.flume.api.RpcClientConfigurationConstants;
  6. import org.apache.flume.api.RpcClient;
  7. import java.nio.charset.Charset;
  8. import java.util.Properties;
  9.  
  10. public class MyApp {
  11. public static void main(String[] args) {
  12. MySecureRpcClientFacade client = new MySecureRpcClientFacade();
  13. // Initialize client with the remote Flume agent's host, port
  14. Properties props = new Properties();
  15. props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
  16. props.setProperty("hosts", "h1");
  17. props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
  18.  
  19. // Initialize client with the kerberos authentication related properties
  20. props.setProperty("kerberos", "true");
  21. props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
  22. props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
  23. props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
  24. client.init(props);
  25.  
  26. // Send 10 events to the remote Flume agent. That agent should be
  27. // configured to listen with an AvroSource.
  28. String sampleData = "Hello Flume!";
  29. for (int i = 0; i < 10; i++) {
  30. client.sendDataToFlume(sampleData);
  31. }
  32.  
  33. client.cleanUp();
  34. }
  35. }
  36.  
  37. class MySecureRpcClientFacade {
  38. private RpcClient client;
  39. private Properties properties;
  40.  
  41. public void init(Properties properties) {
  42. // Setup the RPC connection
  43. this.properties = properties;
  44. // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
  45. this.client = SecureRpcClientFactory.getThriftInstance(properties);
  46. }
  47.  
  48. public void sendDataToFlume(String data) {
  49. // Create a Flume Event object that encapsulates the sample data
  50. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  51.  
  52. // Send the event
  53. try {
  54. client.append(event);
  55. } catch (EventDeliveryException e) {
  56. // clean up and recreate the client
  57. client.close();
  58. client = null;
  59. client = SecureRpcClientFactory.getThriftInstance(properties);
  60. }
  61. }
  62.  
  63. public void cleanUp() {
  64. // Close the RPC connection
  65. client.close();
  66. }
  67. }

The remote ThriftSource should be started in kerberos mode.Below is an example Flume agent configuration that’s waiting for a connectionfrom MyApp:

  1. a1.channels = c1
  2. a1.sources = r1
  3. a1.sinks = k1
  4.  
  5. a1.channels.c1.type = memory
  6.  
  7. a1.sources.r1.channels = c1
  8. a1.sources.r1.type = thrift
  9. a1.sources.r1.bind = 0.0.0.0
  10. a1.sources.r1.port = 41414
  11. a1.sources.r1.kerberos = true
  12. a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
  13. a1.sources.r1.agent-keytab = /tmp/flume.keytab
  14.  
  15.  
  16. a1.sinks.k1.channel = c1
  17. a1.sinks.k1.type = logger

Failover Client

This class wraps the default Avro RPC client to provide failover handlingcapability to clients. This takes a whitespace-separated list of <host>:<port>representing the Flume agents that make-up a failover group. The Failover RPCClient currently does not support thrift. If there’s acommunication error with the currently selected host (i.e. agent) agent,then the failover client automatically fails-over to the next host in the list.For example:

  1. // Setup properties for the failover
  2. Properties props = new Properties();
  3. props.put("client.type", "default_failover");
  4.  
  5. // List of hosts (space-separated list of user-chosen host aliases)
  6. props.put("hosts", "h1 h2 h3");
  7.  
  8. // host/port pair for each host alias
  9. String host1 = "host1.example.org:41414";
  10. String host2 = "host2.example.org:41414";
  11. String host3 = "host3.example.org:41414";
  12. props.put("hosts.h1", host1);
  13. props.put("hosts.h2", host2);
  14. props.put("hosts.h3", host3);
  15.  
  16. // create the client with failover properties
  17. RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation(FailoverRpcClient) can be configured with these properties:

  1. client.type = default_failover
  2.  
  3. hosts = h1 h2 h3 # at least one is required, but 2 or
  4. # more makes better sense
  5.  
  6. hosts.h1 = host1.example.org:41414
  7.  
  8. hosts.h2 = host2.example.org:41414
  9.  
  10. hosts.h3 = host3.example.org:41414
  11.  
  12. max-attempts = 3 # Must be >=0 (default: number of hosts
  13. # specified, 3 in this case). A '0'
  14. # value doesn't make much sense because
  15. # it will just cause an append call to
  16. # immmediately fail. A '1' value means
  17. # that the failover client will try only
  18. # once to send the Event, and if it
  19. # fails then there will be no failover
  20. # to a second client, so this value
  21. # causes the failover client to
  22. # degenerate into just a default client.
  23. # It makes sense to set this value to at
  24. # least the number of hosts that you
  25. # specified.
  26.  
  27. batch-size = 100 # Must be >=1 (default: 100)
  28.  
  29. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  30.  
  31. request-timeout = 20000 # Must be >=1000 (default: 20000)

LoadBalancing RPC client

The Flume Client SDK also supports an RpcClient which load-balances amongmultiple hosts. This type of client takes a whitespace-separated list of<host>:<port> representing the Flume agents that make-up a load-balancing group.This client can be configured with a load balancing strategy that eitherrandomly selects one of the configured hosts, or selects a host in a round-robinfashion. You can also specify your own custom class that implements theLoadBalancingRpcClient$HostSelector interface so that a custom selectionorder is used. In that case, the FQCN of the custom class needs to be specifiedas the value of the host-selector property. The LoadBalancing RPC Clientcurrently does not support thrift.

If backoff is enabled then the client will temporarily blacklisthosts that fail, causing them to be excluded from being selected as a failoverhost until a given timeout. When the timeout elapses, if the host is stillunresponsive then this is considered a sequential failure, and the timeout isincreased exponentially to avoid potentially getting stuck in long waits onunresponsive hosts.

The maximum backoff time can be configured by setting maxBackoff (inmilliseconds). The maxBackoff default is 30 seconds (specified in theOrderSelector class that’s the superclass of both load balancingstrategies). The backoff timeout will increase exponentially with eachsequential failure up to the maximum possible backoff timeout.The maximum possible backoff is limited to 65536 seconds (about 18.2 hours).For example:

  1. // Setup properties for the load balancing
  2. Properties props = new Properties();
  3. props.put("client.type", "default_loadbalance");
  4.  
  5. // List of hosts (space-separated list of user-chosen host aliases)
  6. props.put("hosts", "h1 h2 h3");
  7.  
  8. // host/port pair for each host alias
  9. String host1 = "host1.example.org:41414";
  10. String host2 = "host2.example.org:41414";
  11. String host3 = "host3.example.org:41414";
  12. props.put("hosts.h1", host1);
  13. props.put("hosts.h2", host2);
  14. props.put("hosts.h3", host3);
  15.  
  16. props.put("host-selector", "random"); // For random host selection
  17. // props.put("host-selector", "round_robin"); // For round-robin host
  18. // // selection
  19. props.put("backoff", "true"); // Disabled by default.
  20.  
  21. props.put("maxBackoff", "10000"); // Defaults 0, which effectively
  22. // becomes 30000 ms
  23.  
  24. // Create the client with load balancing properties
  25. RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the load-balancing Flume client implementation(LoadBalancingRpcClient) can be configured with these properties:

  1. client.type = default_loadbalance
  2.  
  3. hosts = h1 h2 h3 # At least 2 hosts are required
  4.  
  5. hosts.h1 = host1.example.org:41414
  6.  
  7. hosts.h2 = host2.example.org:41414
  8.  
  9. hosts.h3 = host3.example.org:41414
  10.  
  11. backoff = false # Specifies whether the client should
  12. # back-off from (i.e. temporarily
  13. # blacklist) a failed host
  14. # (default: false).
  15.  
  16. maxBackoff = 0 # Max timeout in millis that a will
  17. # remain inactive due to a previous
  18. # failure with that host (default: 0,
  19. # which effectively becomes 30000)
  20.  
  21. host-selector = round_robin # The host selection strategy used
  22. # when load-balancing among hosts
  23. # (default: round_robin).
  24. # Other values are include "random"
  25. # or the FQCN of a custom class
  26. # that implements
  27. # LoadBalancingRpcClient$HostSelector
  28.  
  29. batch-size = 100 # Must be >=1 (default: 100)
  30.  
  31. connect-timeout = 20000 # Must be >=1000 (default: 20000)
  32.  
  33. request-timeout = 20000 # Must be >=1000 (default: 20000)