Metrics
PyFlink exposes a metric system that allows gathering and exposing metrics to external systems.
Registering metrics
You can access the metric system from a User-defined Function by calling function_context.get_metric_group()
in the open
method. The get_metric_group()
method returns a MetricGroup
object on which you can create and register new metrics.
Metric types
PyFlink supports Counters
, Gauges
, Distribution
and Meters
.
Counter
A Counter
is used to count something. The current value can be in- or decremented using inc()/inc(n: int)
or dec()/dec(n: int)
. You can create and register a Counter
by calling counter(name: str)
on a MetricGroup
.
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.counter = None
def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")
def eval(self, i):
self.counter.inc(i)
return i
Gauge
A Gauge
provides a value on demand. You can register a gauge by calling gauge(name: str, obj: Callable[[], int])
on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.length = 0
def open(self, function_context):
function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
def eval(self, i):
self.length = i
return i - 1
Distribution
A metric that reports information(sum, count, min, max and mean) about the distribution of reported values. The value can be updated using update(n: int)
. You can register a distribution by calling distribution(name: str)
on a MetricGroup. Distribution metrics are restricted to integer-only distributions.
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.distribution = None
def open(self, function_context):
self.distribution = function_context.get_metric_group().distribution("my_distribution")
def eval(self, i):
self.distribution.update(i)
return i - 1
Meter
A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event()
method. The occurrence of multiple events at the same time can be registered with mark_event(n: int) method. You can register a meter by calling meter(self, name: str, time_span_in_seconds: int = 60)
on a MetricGroup. The default value of time_span_in_seconds is 60.
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.meter = None
def open(self, function_context):
super().open(function_context)
# an average rate of events per second over 120s, default is 60s.
self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
def eval(self, i):
self.meter.mark_event(i)
return i - 1
Scope
You can refer to the Java metric document for more details on Scope definition.
User Scope
You can define a user scope by calling MetricGroup.add_group(key: str, value: str = None)
. If value is not None, creates a new key-value MetricGroup pair. The key group is added to this group’s sub-groups, while the value group is added to the key group’s sub-groups. In this case, the value group will be returned and a user variable will be defined.
function_context
.get_metric_group()
.add_group("my_metrics")
.counter("my_counter")
function_context
.get_metric_group()
.add_group("my_metrics_key", "my_metrics_value")
.counter("my_counter")
System Scope
You can refer to the Java metric document for more details on System Scope.
List of all Variables
You can refer to the Java metric document for more details on List of all Variables.
User Variables
You can define a user variable by calling MetricGroup.addGroup(key: str, value: str = None)
and specifying the value parameter.
Important: User variables cannot be used in scope formats.
function_context
.get_metric_group()
.add_group("my_metrics_key", "my_metrics_value")
.counter("my_counter")
Common part between PyFlink and Flink
You can refer to the Java metric document for more details on the following sections: