自定义开发组件
客户端
客户端操作事件对象的源头,并且把他们发送给flume节点。client最典型的就是处理应用进程产生的数据。目前flume支持Avro,log4j,syslog,http等数据源类型。另外,也有一种execsource类型可以消费本地进程输出的信息。
退出选项不够充分这种情况是很重要的,在这种情况下,如何创建一种自定义的机制发送数据。有两种方式:
- 第一种就是创建自定义的客户端与flume的source进行交互,比如AvroSource和SyslogTcpSource。这样client需要把数据转换成flume理解的格式。
- 第二种,是自定义中flume source基于IPC或者RPC协议与本地进程进行沟通,然后把数据传给flume节点。
注意,所有的数据都是存储在flume节点的channel中。
client SDK
尽管flume包含了一系列的数据产生机制,但是很多场景下还是需要与自定义的应用进行交互。Flume SDK可以帮助开发者使用RPC协议与应用进行连接。
RPC 客户端接口
实现RpcClient接口可以把Flume封装起来。用户仅需要调用Flume的API方法,append以及appendBatch即可发送数据,不需要关心详细的消息信息。用户可以使用提供的Event对象,或者使用EventBuilder方法重写withBody()方法。
RPC 客户端 - Avro 以及 Thrift
Avro是默认的RPC协议,NettyAvroRpcClient以及ThriftRpcClient都实现了RpcClient接口。客户端需要创建目标Agent的主机名以及端口号,并且使用RpcClient发送数据。下面就是一个数据产生的应用:
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// 初始化主机名以及端口号
client.init("host.example.org", 41414);
// 发送给远程节点10条数据
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// 初始化RPC客户端
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// 使用下面的方法创建thrift的客户端
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// 创建事件对象
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
try {// 发送事件
client.append(event);
} catch (EventDeliveryException e) {
// 清除信息,重建Client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
}
}
public void cleanUp() {
// 关闭RPC连接
client.close();
}
}
远程的Flume节点需要有一个AvroSource的source,来监听某个端口。下面就是配置的例子:
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
如果想要更灵活,可以使用默认的flume客户端实现方式(NettyAvroRpcClient以及ThriftRpcClient)可以参考下面的配置:
client.type = default (for avro) or thrift (for thrift)
hosts = h1 # default client accepts only 1 host
# (additional hosts will be ignored)
hosts.h1 = host1.example.org:41414 # host and port must both be specified
# (neither has a default)
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)