Source

The purpose of a Source is to receive data from an external client and storeit into the configured Channels. A Source can get an instance of its ownChannelProcessor to process an Event, commited within a Channellocal transaction, in serial. In the case of an exception, requiredChannels will propagate the exception, all Channels will rollback theirtransaction, but events processed previously on other Channels will remaincommitted.

Similar to the SinkRunner.PollingRunner Runnable, there’sa PollingRunner Runnable that executes on a thread created when theFlume framework calls PollableSourceRunner.start(). Each configuredPollableSource is associated with its own thread that runs aPollingRunner. This thread manages the PollableSource’s lifecycle,such as starting and stopping. A PollableSource implementation mustimplement the start() and stop() methods that are declared in theLifecycleAware interface. The runner of a PollableSource invokes thatSource‘s process() method. The process() method should check fornew data and store it into the Channel as Flume Events.

Note that there are actually two types of Sources. The PollableSourcewas already mentioned. The other is the EventDrivenSource. TheEventDrivenSource, unlike the PollableSource, must have its own callbackmechanism that captures the new data and stores it into the Channel. TheEventDrivenSources are not each driven by their own thread like thePollableSources are. Below is an example of a custom PollableSource:

  1. public class MySource extends AbstractSource implements Configurable, PollableSource {
  2. private String myProp;
  3.  
  4. @Override
  5. public void configure(Context context) {
  6. String myProp = context.getString("myProp", "defaultValue");
  7.  
  8. // Process the myProp value (e.g. validation, convert to another type, ...)
  9.  
  10. // Store myProp for later retrieval by process() method
  11. this.myProp = myProp;
  12. }
  13.  
  14. @Override
  15. public void start() {
  16. // Initialize the connection to the external client
  17. }
  18.  
  19. @Override
  20. public void stop () {
  21. // Disconnect from external client and do any additional cleanup
  22. // (e.g. releasing resources or nulling-out field values) ..
  23. }
  24.  
  25. @Override
  26. public Status process() throws EventDeliveryException {
  27. Status status = null;
  28.  
  29. try {
  30. // This try clause includes whatever Channel/Event operations you want to do
  31.  
  32. // Receive new data
  33. Event e = getSomeData();
  34.  
  35. // Store the Event into this Source's associated Channel(s)
  36. getChannelProcessor().processEvent(e);
  37.  
  38. status = Status.READY;
  39. } catch (Throwable t) {
  40. // Log exception, handle individual exceptions as needed
  41.  
  42. status = Status.BACKOFF;
  43.  
  44. // re-throw all Errors
  45. if (t instanceof Error) {
  46. throw (Error)t;
  47. }
  48. } finally {
  49. txn.close();
  50. }
  51. return status;
  52. }
  53. }