监控
Flume的监控系统的完善仍在进行中,变化可能会比较频繁,有几个Flume组件会向JMX平台MBean服务器报告运行指标。 可以使用Jconsole查询这些指标数据。
JMX Reporting
MX监控可以通过在flume-env.sh脚本中修改JAVA_OPTS环境变量中的JMX参数来开启,比如这样:
- export JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
警告
注意:上面的JVM启动参数例子里面没有开启安全验证,如果要开启请参考:http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Ganglia Reporting
Flume也可以向Ganglia 3或Ganglia 3.1报告运行指标数据。想要开启这个功能,必须在Agent启动时候指定。Flume Agent在启动时候必须制定下面这些参数并在参数前面加上前缀「flume.monitoring.」来配置,也可以在flume-env.sh中设定这些参数。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: ganglia |
hosts | – | hostname:port 格式的 Ganglia 服务列表,多个用逗号分隔 |
pollFrequency | 60 | 向Ganglia服务器报告数据的时间间隔(秒) |
isGanglia3 | false | 设置为true后Ganglia的版本兼容为Ganglia3,默认情况下Flume发送的数据是Ganglia3.1格式的 |
我们可以在启动时这样开启Ganglia支持:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
提示
看上面这个启动脚本,其中 -Dflume.monitoring.type=ganglia 以及后面的参数都是按照上面描述的规则配置的,就是「固定的前缀+参数=参数值」的形式。
JSON Reporting
Flume也支持以JSON格式报告运行指标。为了对外提供这些报告数据,Flume会在某个端口(可自定义)上运行一个web服务来提供这些数据,以下面这种格式:
- {
- "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
- "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
- }
下面是一个具体的报告例子:
- {
- "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
- "Type":"CHANNEL",
- "StopTime":"0",
- "EventPutAttemptCount":"468086",
- "ChannelSize":"233428",
- "StartTime":"1344882233070",
- "EventTakeSuccessCount":"458200",
- "ChannelCapacity":"600000",
- "EventTakeAttemptCount":"458288"},
- "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
- "Type":"CHANNEL",
- "StopTime":"0",
- "EventPutAttemptCount":"22948908",
- "ChannelSize":"5",
- "StartTime":"1344882209413",
- "EventTakeSuccessCount":"22948900",
- "ChannelCapacity":"100",
- "EventTakeAttemptCount":"22948908"}
- }
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: http |
port | 41414 | 查看json报告的端口 |
启用JSON报告的启动脚本示例:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
启动后可以通过这个地址 http://<hostname>:<port>/metrics 来查看报告,自定义组件可以报告上面Ganglia部分中提到的指标数据。
Custom Reporting
可以通过编写自己的执行报告服务向其他系统报告运行指标。 报告类必须实现org.apache.flume.instrumentation.MonitorService 接口。自定义的报告类与GangliaServer的报告用法相同。 他们可以轮询请求mbean服务器获取mbeans的运行指标。 例如,假设一个命名为为HTTPReporting的HTTP监视服务,启动脚本如下所示:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
属性 | 默认值 | 解释 |
---|---|---|
type | – | 自定义报告组件的全限定类名 |
Reporting metrics from custom components
自定义Flume监控组件必须应继承自 org.apache.flume.instrumentation.MonitoredCounterGroup 类。 然后,该类应为其公开的每个度量指标提供getter方法。 请参阅下面的代码。 MonitoredCounterGroup 需要一个此类要提供的监控属性列表。 目前仅支持将监控指标值设置为long型。
- public class SinkCounter extends MonitoredCounterGroup implements
- SinkCounterMBean {
- private static final String COUNTER_CONNECTION_CREATED =
- "sink.connection.creation.count";
- private static final String COUNTER_CONNECTION_CLOSED =
- "sink.connection.closed.count";
- private static final String COUNTER_CONNECTION_FAILED =
- "sink.connection.failed.count";
- private static final String COUNTER_BATCH_EMPTY =
- "sink.batch.empty";
- private static final String COUNTER_BATCH_UNDERFLOW =
- "sink.batch.underflow";
- private static final String COUNTER_BATCH_COMPLETE =
- "sink.batch.complete";
- private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
- "sink.event.drain.attempt";
- private static final String COUNTER_EVENT_DRAIN_SUCCESS =
- "sink.event.drain.sucess";
- private static final String[] ATTRIBUTES = {
- COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
- COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
- COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
- COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
- };
- public SinkCounter(String name) {
- super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
- }
- @Override
- public long getConnectionCreatedCount() {
- return get(COUNTER_CONNECTION_CREATED);
- }
- public long incrementConnectionCreatedCount() {
- return increment(COUNTER_CONNECTION_CREATED);
- }
- }