Traces

Flink exposes a tracing system that allows gathering and exposing traces to external systems.

Reporting traces

You can access the tracing system from any user function that extends RichFunction by calling getRuntimeContext().getMetricGroup(). This method returns a MetricGroup object via which you can report a new single span trace.

Reporting single Span

A Span represents something that happened in Flink at certain point of time, that will be reported to a TraceReporter. To report a Span you can use the MetricGroup#addSpan(SpanBuilder) method.

Currently we don’t support traces with multiple spans. Each Span is self-contained and represents things like a checkpoint or recovery.

Java

  1. public class MyClass {
  2. void doSomething() {
  3. // (...)
  4. metricGroup.addSpan(
  5. Span.builder(MyClass.class, "SomeAction")
  6. .setStartTsMillis(startTs) // Optional
  7. .setEndTsMillis(endTs) // Optional
  8. .setAttribute("foo", "bar");
  9. }
  10. }

Python

  1. Currently reporting Spans from Python is not supported.

Reporter

For information on how to set up Flink’s trace reporters please take a look at the trace reporters documentation.

System traces

Flink reports traces listed below.

The tables below generally feature 5 columns:

  • The “Scope” column describes what is that trace reported scope.

  • The “Name” column describes the name of the reported trace.

  • The “Attributes” column lists the names of all attributes that are reported with the given trace.

  • The “Description” column provides information as to what a given attribute is reporting.

Checkpointing and initialization

Flink reports a single span trace for the whole checkpoint and job initialization events once that event reaches a terminal state: COMPLETED or FAILED.

ScopeNameAttributesDescription
org.apache.flink.
runtime.checkpoint.
CheckpointStatsTracker
CheckpointstartTsTimestamp when the checkpoint has started.
endTsTimestamp when the checkpoint has finished.
checkpointIdId of the checkpoint.
checkpointedSizeSize in bytes of checkpointed state during this checkpoint. Might be smaller than fullSize if incremental checkpoints are used.
fullSizeFull size in bytes of the referenced state by this checkpoint. Might be larger than checkpointSize if incremental checkpoints are used.
checkpointStatusWhat was the state of this checkpoint: FAILED or COMPLETED.
JobInitializationstartTsTimestamp when the job initialization has started.
endTsTimestamp when the job initialization has finished.
checkpointId (optional)Id of the checkpoint that the job recovered from (if any).
fullSizeFull size in bytes of the referenced state by the checkpoint that was used during recovery (if any).
(Max/Sum)MailboxStartDurationMsThe aggregated (max and sum) across all subtasks duration between subtask being created until all classes and objects of that subtask are initialize.
(Max/Sum)ReadOutputDataDurationMsThe aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint’s output buffers.
(Max/Sum)InitializeStateDurationMsThe aggregated (max and sum) across all subtasks duration to initialize a state backend (including state files download time)
(Max/Sum)GateRestoreDurationMsThe aggregated (max and sum) across all subtasks duration of reading unaligned checkpoint’s input buffers.
(Max/Sum)DownloadStateDurationMs

(optional - currently only supported by RocksDB Incremental)
The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.
(Max/Sum)RestoredStateSizeBytes.[location]The aggregated (max and sum) across all subtasks size of restored state by location. Possible locations are defined in Enum StateObjectSizeStatsCollector as LOCAL_MEMORY, LOCAL_DISK, REMOTE, UNKNOWN.