事务接口
Transaction接口是Flume可靠性的基础。所有的组件(source,sink以及channel)都需要使用transaction。
一个Transaction是通过channel实现的。每个source以及都需要连接channel,但是不包括transaction对象。source实际上是使用channelSelector接口实现Transaction.Event的存储以及消费都在一个Transaction活动中。比如:
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
这里仅仅给出存储的例子。在begin()返回后,Transaction开启,并且当Event进入到channel后,如果存储成功,则Transaction提交并且关闭。
sink
sink从channel中取出event,然后转发给下一个节点或者存储在外部的存储点。一个sink与一个channel协同工作,在FLume的配置文件中设置。有一个SinkRunner实例会管理每个配置的sink,当flume 框架调用sinkRunner.start()方法的时候,新的线程将会开启用于扮演sink的角色(使用sinkRunner.PollingRunner作为线程的Runable).这个线程管理了sink的生命周期。sink需要实现start()以及stop()方法,作为LifecycleAware接口的一部分。Sink.start()方法用于初始化sink,并切换到可以处理Event的状态。Sink.process()方法负责把channel中的数据取出提取Event事件。Sink.stop()方法则负责必要的清理工作(释放资源)。sink通过实现configurable接口,也可以自行进行一些配置。
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}
Source
source目的是从外部的客户端或者channel中获取Event对象。一个source可以是一个ChannelProcessor的实例来产生event。ChannelProcessor可以通过channelSeletor获得实例。Transaction也可以保证source与channel之间的可靠性。
与sinkRunner.PollingRunner的Runable类似,PollingRunner Runable可以通过PollableSourceRunner.start()创建。
注意实际上有两个source,PollableSource是准备阶段。另一个是EventDrivenSource。它具有回调机制。
public class MySource extends AbstractSource implements Configurable, PollableSource
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e)
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}