Source
The purpose of a Source is to receive data from an external client and storeit into the configured Channels. A Source can get an instance of its ownChannelProcessor to process an Event, commited within a Channellocal transaction, in serial. In the case of an exception, requiredChannels will propagate the exception, all Channels will rollback theirtransaction, but events processed previously on other Channels will remaincommitted.
Similar to the SinkRunner.PollingRunner Runnable, there’sa PollingRunner Runnable that executes on a thread created when theFlume framework calls PollableSourceRunner.start(). Each configuredPollableSource is associated with its own thread that runs aPollingRunner. This thread manages the PollableSource’s lifecycle,such as starting and stopping. A PollableSource implementation mustimplement the start() and stop() methods that are declared in theLifecycleAware interface. The runner of a PollableSource invokes thatSource‘s process() method. The process() method should check fornew data and store it into the Channel as Flume Events.
Note that there are actually two types of Sources. The PollableSourcewas already mentioned. The other is the EventDrivenSource. TheEventDrivenSource, unlike the PollableSource, must have its own callbackmechanism that captures the new data and stores it into the Channel. TheEventDrivenSources are not each driven by their own thread like thePollableSources are. Below is an example of a custom PollableSource:
- public class MySource extends AbstractSource implements Configurable, PollableSource {
- private String myProp;
- @Override
- public void configure(Context context) {
- String myProp = context.getString("myProp", "defaultValue");
- // Process the myProp value (e.g. validation, convert to another type, ...)
- // Store myProp for later retrieval by process() method
- this.myProp = myProp;
- }
- @Override
- public void start() {
- // Initialize the connection to the external client
- }
- @Override
- public void stop () {
- // Disconnect from external client and do any additional cleanup
- // (e.g. releasing resources or nulling-out field values) ..
- }
- @Override
- public Status process() throws EventDeliveryException {
- Status status = null;
- try {
- // This try clause includes whatever Channel/Event operations you want to do
- // Receive new data
- Event e = getSomeData();
- // Store the Event into this Source's associated Channel(s)
- getChannelProcessor().processEvent(e);
- status = Status.READY;
- } catch (Throwable t) {
- // Log exception, handle individual exceptions as needed
- status = Status.BACKOFF;
- // re-throw all Errors
- if (t instanceof Error) {
- throw (Error)t;
- }
- } finally {
- txn.close();
- }
- return status;
- }
- }