Traces
Flink exposes a tracing system that allows gathering and exposing traces to external systems.
Reporting traces
You can access the tracing system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup()
. This method returns a MetricGroup
object via which you can report a new single span trace.
Reporting single Span
A Span
represents something that happened in Flink at certain point of time, that will be reported to a TraceReporter
. To report a Span
you can use the MetricGroup#addSpan(SpanBuilder)
method.
Currently we don’t support traces with multiple spans. Each Span
is self-contained and represents things like a checkpoint or recovery.
Java
public class MyClass {
void doSomething() {
// (...)
metricGroup.addSpan(
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar");
}
}
Python
Currently reporting Spans from Python is not supported.
Reporter
For information on how to set up Flink’s trace reporters please take a look at the trace reporters documentation.
System traces
Flink reports traces listed below.
The tables below generally feature 5 columns:
The “Scope” column describes what is that trace reported scope.
The “Name” column describes the name of the reported trace.
The “Attributes” column lists the names of all attributes that are reported with the given trace.
The “Description” column provides information as to what a given attribute is reporting.
Checkpointing and initialization
Flink reports a single span trace for the whole checkpoint and job initialization events once that event reaches a terminal state: COMPLETED or FAILED.
Scope | Name | Attributes | Description |
---|---|---|---|
org.apache.flink. runtime.checkpoint. CheckpointStatsTracker | Checkpoint | startTs | Timestamp when the checkpoint has started. |
endTs | Timestamp when the checkpoint has finished. | ||
checkpointId | Id of the checkpoint. | ||
checkpointedSize | Size in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used. | ||
fullSize | Full size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used. | ||
checkpointStatus | What was the state of this checkpoint: FAILED or COMPLETED. | ||
JobInitialization | startTs | Timestamp when the job initialization has started. | |
endTs | Timestamp when the job initialization has finished. | ||
checkpointId (optional) | Id of the checkpoint that the job recovered from (if any). | ||
fullSize | Full size in bytes of the referenced state by the checkpoint that was used during recovery (if any). | ||
(Max/Sum)MailboxStartDurationMs | The aggregated (max and sum) across all subtasks duration between subtask being created until all classes and objects of that subtask are initialize. | ||
(Max/Sum)ReadOutputDataDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint’s output buffers. | ||
(Max/Sum)InitializeStateDurationMs | The aggregated (max and sum) across all subtasks duration to initialize a state backend (including state files download time) | ||
(Max/Sum)GateRestoreDurationMs | The aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint’s input buffers. | ||
(Max/Sum)DownloadStateDurationMs (optional - currently only supported by RocksDB Incremental) | The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS. | ||
(Max/Sum)RestoredStateSizeBytes.[location] | The aggregated (max and sum) across all subtasks size of restored state by location. Possible locations are defined in Enum StateObjectSizeStatsCollector as LOCAL_MEMORY, LOCAL_DISK, REMOTE, UNKNOWN. |