指标
PyFlink 支持指标系统,该指标系统允许收集指标并将其暴露给外部系统。
注册指标
您可以通过在Python 用户自定义函数的 open
方法中调用 function_context.get_metric_group()
来访问指标系统。 get_metric_group()
方法返回一个 MetricGroup
对象,您可以在该对象上创建和注册新指标。
指标类型
PyFlink 支持计数器 Counters
,量表 Gauges
,分布 Distribution
和仪表 Meters
。
计数器 Counter
Counter
用于计算某个东西的出现次数。可以通过 inc()/inc(n: int)
或 dec()/dec(n: int)
增加或减少当前值。 您可以通过在 MetricGroup
上调用 counter(name: str)
来创建和注册 Counter
。
Python
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.counter = None
def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")
def eval(self, i):
self.counter.inc(i)
return i
量表
Gauge
可按需返回数值。您可以通过在 MetricGroup
上调用 gauge(name: str, obj: Callable[[], int])
来注册一个量表。Callable 对象将用于汇报数值。量表指标(Gauge metrics)只能用于汇报整数值。
Python
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.length = 0
def open(self, function_context):
function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
def eval(self, i):
self.length = i
return i - 1
分布(Distribution)
Distribution
用于报告关于所报告值分布的信息(总和,计数,最小,最大和平均值)的指标。可以通过 update(n: int)
来更新当前值。您可以通过在 MetricGroup
上调用 distribution(name: str)
来注册该指标。分布指标(Distribution metrics)只能用于汇报整数指标。
Python
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.distribution = None
def open(self, function_context):
self.distribution = function_context.get_metric_group().distribution("my_distribution")
def eval(self, i):
self.distribution.update(i)
return i - 1
仪表
仪表用于汇报平均吞吐量。可以使用 mark_event()
函数来注册事件的发生,使用 mark_event(n: int)
函数来注册同时发生的多个事件。 您可以通过在 MetricGroup
上调用 meter(self, name: str, time_span_in_seconds: int = 60)
来注册仪表。time_span_in_seconds的默认值为60。
Python
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.meter = None
def open(self, function_context):
super().open(function_context)
# 120秒内统计的平均每秒事件数,默认是60秒
self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
def eval(self, i):
self.meter.mark_event(i)
return i - 1
范围(Scope)
您可以参考 Java 指标文档以获取有关范围定义的更多详细信息。
用户范围(User Scope)
您可以通过调用 MetricGroup.add_group(key: str, value: str = None)
来定义用户范围。如果 value
不为 None
,则创建一个新的键值 MetricGroup
对。 其中,键组被添加到该组的子组中,而值组又被添加到键组的子组中。在这种情况下,值组将作为结果返回,与此同时,创建一个用户变量。
Python
function_context
.get_metric_group()
.add_group("my_metrics")
.counter("my_counter")
function_context
.get_metric_group()
.add_group("my_metrics_key", "my_metrics_value")
.counter("my_counter")
系统范围(System Scope)
您可以参考 Java 指标文档以获取有关系统范围的更多详细信息。
所有变量列表
您可以参考 Java 指标文档以获取有关“所有变量列表”的更多详细信息。
用户变量(User Variables)
您可以通过调用 MetricGroup.addGroup(key: str, value: str = None)
并指定 value 参数来定义用户变量。
**重要提示:**用户变量不能在以 scope format
中使用。
Python
function_context
.get_metric_group()
.add_group("my_metrics_key", "my_metrics_value")
.counter("my_counter")
PyFlink 和 Flink 的共通部分
您可以参考 Java 的指标文档,以获取关于以下部分的更多详细信息:
- Reporter 。
- 系统指标 。
- 延迟跟踪 。
- REST API 集成 。
- 仪表板集成 。