Monitoring
Monitoring in Flume is still a work in progress. Changes can happen very often.Several Flume components report metrics to the JMX platform MBean server. Thesemetrics can be queried using Jconsole.
Available Component Metrics
The following tables show what metrics are available for components. Each component only maintains aset of metrics, indicated by an ‘x’, the unmaintained ones show default values, that is 0.These tables tell you where you can expect meaningful data.The name of the metrics should be descriptive enough, for more information you have to dig into thesource code of the components.
Sources 1
Avro | Exec | HTTP | JMS | Kafka | MultiportSyslogTCP | Scribe | |
AppendAcceptedCount | x | ||||||
AppendBatchAcceptedCount | x | x | x | ||||
AppendBatchReceivedCount | x | x | x | ||||
AppendReceivedCount | x | ||||||
ChannelWriteFail | x | x | x | x | x | x | |
EventAcceptedCount | x | x | x | x | x | x | x |
EventReadFail | x | x | x | x | x | ||
EventReceivedCount | x | x | x | x | x | x | x |
GenericProcessingFail | x | x | |||||
KafkaCommitTimer | x | ||||||
KafkaEmptyCount | x | ||||||
KafkaEventGetTimer | x | ||||||
OpenConnectionCount | x |
Sources 2
SequenceGenerator | SpoolDirectory | SyslogTcp | SyslogUDP | Taildir | Thrift | |
AppendAcceptedCount | x | |||||
AppendBatchAcceptedCount | x | x | x | x | ||
AppendBatchReceivedCount | x | x | x | |||
AppendReceivedCount | x | |||||
ChannelWriteFail | x | x | x | x | x | x |
EventAcceptedCount | x | x | x | x | x | x |
EventReadFail | x | x | x | x | ||
EventReceivedCount | x | x | x | x | x | |
GenericProcessingFail | x | x | ||||
KafkaCommitTimer | ||||||
KafkaEmptyCount | ||||||
KafkaEventGetTimer | ||||||
OpenConnectionCount |
Sinks 1
Avro/Thrift | AsyncHBase | ElasticSearch | HBase | HBase2 | |
BatchCompleteCount | x | x | x | x | x |
BatchEmptyCount | x | x | x | x | x |
BatchUnderflowCount | x | x | x | x | x |
ChannelReadFail | x | x | |||
ConnectionClosedCount | x | x | x | x | x |
ConnectionCreatedCount | x | x | x | x | x |
ConnectionFailedCount | x | x | x | x | x |
EventDrainAttemptCount | x | x | x | x | x |
EventDrainSuccessCount | x | x | x | x | x |
EventWriteFail | x | x | |||
KafkaEventSendTimer | |||||
RollbackCount |
Sinks 2
HDFSEvent | Hive | Http | Kafka | Morphline | RollingFile | |
BatchCompleteCount | x | x | x | |||
BatchEmptyCount | x | x | x | x | ||
BatchUnderflowCount | x | x | x | x | ||
ChannelReadFail | x | x | x | x | x | x |
ConnectionClosedCount | x | x | x | |||
ConnectionCreatedCount | x | x | x | |||
ConnectionFailedCount | x | x | x | |||
EventDrainAttemptCount | x | x | x | x | x | |
EventDrainSuccessCount | x | x | x | x | x | x |
EventWriteFail | x | x | x | x | x | x |
KafkaEventSendTimer | x | |||||
RollbackCount | x |
Channels
File | Kafka | Memory | PseudoTxnMemory | SpillableMemory | |
ChannelCapacity | x | x | x | ||
ChannelSize | x | x | x | x | |
CheckpointBackupWriteErrorCount | x | ||||
CheckpointWriteErrorCount | x | ||||
EventPutAttemptCount | x | x | x | x | x |
EventPutErrorCount | x | ||||
EventPutSuccessCount | x | x | x | x | x |
EventTakeAttemptCount | x | x | x | x | x |
EventTakeErrorCount | x | ||||
EventTakeSuccessCount | x | x | x | x | x |
KafkaCommitTimer | x | ||||
KafkaEventGetTimer | x | ||||
KafkaEventSendTimer | x | ||||
Open | x | ||||
RollbackCounter | x | ||||
Unhealthy | x |
JMX Reporting
JMX Reporting can be enabled by specifying JMX parameters in the JAVA_OPTS environment variable usingflume-env.sh, like
export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
NOTE: The sample above disables the security. To enable Security, please refer http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html
Ganglia Reporting
Flume can also report these metrics toGanglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agentmust be started with this support. The Flume agent has to be started by passingin the following parameters as system properties prefixed by flume.monitoring.,and can be specified in the flume-env.sh:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be ganglia |
hosts | – | Comma-separated list of hostname:port of Ganglia servers |
pollFrequency | 60 | Time, in seconds, between consecutive reporting to Ganglia server |
isGanglia3 | false | Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format |
We can start Flume with Ganglia support as follows:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
JSON Reporting
Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hostsa Web server on a configurable port. Flume reports metrics in the following JSON format:
- {
- "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
- "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
- }
Here is an example:
- {
- "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
- "Type":"CHANNEL",
- "StopTime":"0",
- "EventPutAttemptCount":"468086",
- "ChannelSize":"233428",
- "StartTime":"1344882233070",
- "EventTakeSuccessCount":"458200",
- "ChannelCapacity":"600000",
- "EventTakeAttemptCount":"458288"},
- "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
- "Type":"CHANNEL",
- "StopTime":"0",
- "EventPutAttemptCount":"22948908",
- "ChannelSize":"5",
- "StartTime":"1344882209413",
- "EventTakeSuccessCount":"22948900",
- "ChannelCapacity":"100",
- "EventTakeAttemptCount":"22948908"}
- }
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be http |
port | 41414 | The port to start the server on. |
We can start Flume with JSON Reporting support as follows:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
Metrics will then be available at http://<hostname>:<port>/metrics webpage.Custom components can report metrics as mentioned in the Ganglia section above.
Custom Reporting
It is possible to report metrics to other systems by writing servers that dothe reporting. Any reporting class has to implement the interface,org.apache.flume.instrumentation.MonitorService. Such a class can be usedthe same way the GangliaServer is used for reporting. They can poll the platformmbean server to poll the mbeans for metrics. For example, if an HTTPmonitoring service called HTTPReporting can be used as follows:
- $ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property Name | Default | Description |
---|---|---|
type | – | The component type name, has to be FQCN |
Reporting metrics from custom components
Any custom flume components should inherit from theorg.apache.flume.instrumentation.MonitoredCounterGroup class. The classshould then provide getter methods for each of the metrics it exposes. Seethe code below. The MonitoredCounterGroup expects a list of attributes whosemetrics are exposed by this class. As of now, this class only supports exposingmetrics as long values.
- public class SinkCounter extends MonitoredCounterGroup implements
- SinkCounterMBean {
- private static final String COUNTER_CONNECTION_CREATED =
- "sink.connection.creation.count";
- private static final String COUNTER_CONNECTION_CLOSED =
- "sink.connection.closed.count";
- private static final String COUNTER_CONNECTION_FAILED =
- "sink.connection.failed.count";
- private static final String COUNTER_BATCH_EMPTY =
- "sink.batch.empty";
- private static final String COUNTER_BATCH_UNDERFLOW =
- "sink.batch.underflow";
- private static final String COUNTER_BATCH_COMPLETE =
- "sink.batch.complete";
- private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
- "sink.event.drain.attempt";
- private static final String COUNTER_EVENT_DRAIN_SUCCESS =
- "sink.event.drain.sucess";
- private static final String[] ATTRIBUTES = {
- COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
- COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
- COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
- COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
- };
- public SinkCounter(String name) {
- super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
- }
- @Override
- public long getConnectionCreatedCount() {
- return get(COUNTER_CONNECTION_CREATED);
- }
- public long incrementConnectionCreatedCount() {
- return increment(COUNTER_CONNECTION_CREATED);
- }
- }