指标

PyFlink支持指标系统,该指标系统允许收集指标并将其暴露给外部系统。

注册指标

您可以通过在用户自定义函数open方法中调用function_context.get_metric_group()来访问指标系统。 get_metric_group()方法返回一个MetricGroup对象,您可以在该对象上创建和注册新指标。

指标类型

PyFlink支持计数器Counters,量表Gauges,分布Distribution和仪表Meters

计数器 Counter

Counter用于计算某个东西的出现次数。可以通过inc()/inc(n: int)dec()/dec(n: int)增加或减少当前值。 您可以通过在MetricGroup上调用counter(name: str)来创建和注册Counter

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.counter = None
  5. def open(self, function_context):
  6. self.counter = function_context.get_metric_group().counter("my_counter")
  7. def eval(self, i):
  8. self.counter.inc(i)
  9. return i

量表

Gauge可按需返回数值。您可以通过在MetricGroup上调用gauge(name: str, obj: Callable[[], int])来注册一个量表。Callable对象将用于汇报数值。量表指标(Gauge metrics)只能用于汇报整数值。

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.length = 0
  5. def open(self, function_context):
  6. function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
  7. def eval(self, i):
  8. self.length = i
  9. return i - 1

分布(Distribution)

Distribution用于报告关于所报告值分布的信息(总和,计数,最小,最大和平均值)的指标。可以通过update(n: int)来更新当前值。您可以通过在MetricGroup上调用distribution(name: str)来注册该指标。分布指标(Distribution metrics)只能用于汇报整数指标。

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.distribution = None
  5. def open(self, function_context):
  6. self.distribution = function_context.get_metric_group().distribution("my_distribution")
  7. def eval(self, i):
  8. self.distribution.update(i)
  9. return i - 1

仪表

仪表用于汇报平均吞吐量。可以使用mark_event()函数来注册事件的发生,使用mark_event(n: int)函数来注册同时发生的多个事件。 您可以通过在MetricGroup上调用meter(self, name: str, time_span_in_seconds: int = 60)来注册仪表。time_span_in_seconds的默认值为60。

  1. from pyflink.table.udf import ScalarFunction
  2. class MyUDF(ScalarFunction):
  3. def __init__(self):
  4. self.meter = None
  5. def open(self, function_context):
  6. super().open(function_context)
  7. # 120秒内统计的平均每秒事件数,默认是60秒
  8. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
  9. def eval(self, i):
  10. self.meter.mark_event(i)
  11. return i - 1

范围(Scope)

您可以参考Java指标文档以获取有关范围定义的更多详细信息。

用户范围(User Scope)

您可以通过调用MetricGroup.add_group(key: str, value: str = None)来定义用户范围。如果value不为None,则创建一个新的键值MetricGroup对。 其中,键组被添加到该组的子组中,而值组又被添加到键组的子组中。在这种情况下,值组将作为结果返回,与此同时,创建一个用户变量。

  1. function_context
  2. .get_metric_group()
  3. .add_group("my_metrics")
  4. .counter("my_counter")
  5. function_context
  6. .get_metric_group()
  7. .add_group("my_metrics_key", "my_metrics_value")
  8. .counter("my_counter")

系统范围(System Scope)

您可以参考Java指标文档以获取有关系统范围的更多详细信息。

所有变量列表

您可以参考Java指标文档以获取有关“所有变量列表”的更多详细信息。

用户变量(User Variables)

您可以通过调用MetricGroup.addGroup(key: str, value: str = None)并指定value参数来定义用户变量。

重要提示:用户变量不能在以scope format中使用。

  1. function_context
  2. .get_metric_group()
  3. .add_group("my_metrics_key", "my_metrics_value")
  4. .counter("my_counter")

PyFlink和Flink的共通部分

您可以参考Java的指标文档,以获取关于以下部分的更多详细信息: