Sink

The purpose of a Sink to extract Events from the Channel andforward them to the next Flume Agent in the flow or store them in an externalrepository. A Sink is associated with exactly one Channels, asconfigured in the Flume properties file. There’s one SinkRunner instanceassociated with every configured Sink, and when the Flume framework callsSinkRunner.start(), a new thread is created to drive the Sink (usingSinkRunner.PollingRunner as the thread’s Runnable). This thread managesthe Sink’s lifecycle. The Sink needs to implement the start() andstop() methods that are part of the LifecycleAware interface. TheSink.start() method should initialize the Sink and bring it to a statewhere it can forward the Events to its next destination. TheSink.process() method should do the core processing of extracting theEvent from the Channel and forwarding it. The Sink.stop() methodshould do the necessary cleanup (e.g. releasing resources). The Sinkimplementation also needs to implement the Configurable interface forprocessing its own configuration settings. For example:

  1. public class MySink extends AbstractSink implements Configurable {
  2. private String myProp;
  3.  
  4. @Override
  5. public void configure(Context context) {
  6. String myProp = context.getString("myProp", "defaultValue");
  7.  
  8. // Process the myProp value (e.g. validation)
  9.  
  10. // Store myProp for later retrieval by process() method
  11. this.myProp = myProp;
  12. }
  13.  
  14. @Override
  15. public void start() {
  16. // Initialize the connection to the external repository (e.g. HDFS) that
  17. // this Sink will forward Events to ..
  18. }
  19.  
  20. @Override
  21. public void stop () {
  22. // Disconnect from the external respository and do any
  23. // additional cleanup (e.g. releasing resources or nulling-out
  24. // field values) ..
  25. }
  26.  
  27. @Override
  28. public Status process() throws EventDeliveryException {
  29. Status status = null;
  30.  
  31. // Start transaction
  32. Channel ch = getChannel();
  33. Transaction txn = ch.getTransaction();
  34. txn.begin();
  35. try {
  36. // This try clause includes whatever Channel operations you want to do
  37.  
  38. Event event = ch.take();
  39.  
  40. // Send the Event to the external repository.
  41. // storeSomeData(e);
  42.  
  43. txn.commit();
  44. status = Status.READY;
  45. } catch (Throwable t) {
  46. txn.rollback();
  47.  
  48. // Log exception, handle individual exceptions as needed
  49.  
  50. status = Status.BACKOFF;
  51.  
  52. // re-throw all Errors
  53. if (t instanceof Error) {
  54. throw (Error)t;
  55. }
  56. }
  57. return status;
  58. }
  59. }