指标

PyFlink 支持指标系统,该指标系统允许收集指标并将其暴露给外部系统。

注册指标

您可以通过在Python 用户自定义函数open 方法中调用 function_context.get_metric_group() 来访问指标系统。 get_metric_group() 方法返回一个 MetricGroup 对象,您可以在该对象上创建和注册新指标。

指标类型

PyFlink 支持计数器 Counters,量表 Gauges ,分布 Distribution 和仪表 Meters

计数器 Counter

Counter 用于计算某个东西的出现次数。可以通过 inc()/inc(n: int)dec()/dec(n: int) 增加或减少当前值。 您可以通过在 MetricGroup 上调用 counter(name: str) 来创建和注册 Counter

Python

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.counter = None
  5. def open(self, function_context):
  6. self.counter = function_context.get_metric_group().counter("my_counter")
  7. def eval(self, i):
  8. self.counter.inc(i)
  9. return i

量表

Gauge 可按需返回数值。您可以通过在 MetricGroup 上调用 gauge(name: str, obj: Callable[[], int]) 来注册一个量表。Callable 对象将用于汇报数值。量表指标(Gauge metrics)只能用于汇报整数值。

Python

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.length = 0
  5. def open(self, function_context):
  6. function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
  7. def eval(self, i):
  8. self.length = i
  9. return i - 1

分布(Distribution)

Distribution 用于报告关于所报告值分布的信息(总和,计数,最小,最大和平均值)的指标。可以通过 update(n: int) 来更新当前值。您可以通过在 MetricGroup 上调用 distribution(name: str) 来注册该指标。分布指标(Distribution metrics)只能用于汇报整数指标。

Python

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.distribution = None
  5. def open(self, function_context):
  6. self.distribution = function_context.get_metric_group().distribution("my_distribution")
  7. def eval(self, i):
  8. self.distribution.update(i)
  9. return i - 1

仪表

仪表用于汇报平均吞吐量。可以使用 mark_event() 函数来注册事件的发生,使用 mark_event(n: int) 函数来注册同时发生的多个事件。 您可以通过在 MetricGroup 上调用 meter(self, name: str, time_span_in_seconds: int = 60) 来注册仪表。time_span_in_seconds的默认值为60。

Python

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.meter = None
  5. def open(self, function_context):
  6. super().open(function_context)
  7. # 120秒内统计的平均每秒事件数,默认是60秒
  8. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
  9. def eval(self, i):
  10. self.meter.mark_event(i)
  11. return i - 1

范围(Scope)

您可以参考 Java 指标文档以获取有关范围定义的更多详细信息。

用户范围(User Scope)

您可以通过调用 MetricGroup.add_group(key: str, value: str = None) 来定义用户范围。如果 value 不为 None,则创建一个新的键值 MetricGroup对。 其中,键组被添加到该组的子组中,而值组又被添加到键组的子组中。在这种情况下,值组将作为结果返回,与此同时,创建一个用户变量。

Python

  1. function_context
  2. .get_metric_group()
  3. .add_group("my_metrics")
  4. .counter("my_counter")
  5. function_context
  6. .get_metric_group()
  7. .add_group("my_metrics_key", "my_metrics_value")
  8. .counter("my_counter")

系统范围(System Scope)

您可以参考 Java 指标文档以获取有关系统范围的更多详细信息。

所有变量列表

您可以参考 Java 指标文档以获取有关“所有变量列表”的更多详细信息。

用户变量(User Variables)

您可以通过调用 MetricGroup.addGroup(key: str, value: str = None) 并指定 value 参数来定义用户变量。

**重要提示:**用户变量不能在以 scope format 中使用。

Python

  1. function_context
  2. .get_metric_group()
  3. .add_group("my_metrics_key", "my_metrics_value")
  4. .counter("my_counter")

PyFlink 和 Flink 的共通部分

您可以参考 Java 的指标文档,以获取关于以下部分的更多详细信息: