自定义故障丰富器(Failure Enricher)

Flink 提供了一个可插拔的接口,供用户注册他们自定义的逻辑,并使用额外的元数据标签(字符串类型的 key-value 对)来丰富故障(Failure)信息。 这使得用户可以实现自定义的故障丰富器插件,对作业故障进行分类、对外暴露自定义指标,或者调用外部通知系统。

每当 JobManager 在运行时收到异常时,都会触发 FailureEnrichers。 每个 FailureEnricher 可以异步返回故障对应的标签(labels),这些标签可以通过 JobManager 的 REST API 来查询(例如:“type:System” 的标签意味着该故障被分类为系统错误)。

实现一个自定义的故障丰富器插件

要实现自定义的 FailureEnricher 插件,需要按照下面的步骤:

  • 通过实现 FailureEnricher 接口,添加自定义的 FailureEnricher

  • 通过实现 FailureEnricherFactory 接口,添加自定义的 FailureEnricherFactory

  • 添加一个 Service 入口,创建一个文件 META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory,文件中包含自定义 FailureEnricherFactory 的类名(有关详细信息,请参阅 Java Service Loader 文档)。

然后,创建一个包含 FailureEnricherFailureEnricherFactoryMETA-INF/services/ 和所有外部依赖项的 jar 文件。 在 Flink Deploy 目录中的 plugins/ 目录中创建一个任意名称的目录,例如 “failure-enrichment”,并将 jar 文件放入此目录中。 使用方式与其它 Plugin 是类似的,有关的详细信息,请参考 Flink Plugins

请注意,每个 FailureEnricher 都应该定义一组与对应 values 关联的 output keys 。这组 keys 必须是唯一的,否则将忽略具有重复 key 的所有丰富器(enrichers)。

FailureEnricherFactory 示例:

  1. package org.apache.flink.test.plugin.jar.failure;
  2. public class TestFailureEnricherFactory implements FailureEnricherFactory {
  3. @Override
  4. public FailureEnricher createFailureEnricher(Configuration conf) {
  5. return new CustomEnricher();
  6. }
  7. }

FailureEnricher 示例:

  1. package org.apache.flink.test.plugin.jar.failure;
  2. public class CustomEnricher implements FailureEnricher {
  3. private final Set<String> outputKeys;
  4. public CustomEnricher() {
  5. this.outputKeys = Collections.singleton("labelKey");
  6. }
  7. @Override
  8. public Set<String> getOutputKeys() {
  9. return outputKeys;
  10. }
  11. @Override
  12. public CompletableFuture<Map<String, String>> processFailure(
  13. Throwable cause, Context context) {
  14. return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue"));
  15. }
  16. }

配置

JobManager 在启动时加载 FailureEnricher 插件。 为了确保您的 FailureEnrichers 已加载,所有类名都应该定义在 jobmanager.failure-enrichers 配置 中。 如果此配置为空,则不会启动任何 Enricher。 示例:

  1. jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.CustomEnricher

验证

要验证 FailureEnricher 是否已加载,您可以检查 JobManager 日志中是否有下面的内容:

  1. Found failure enricher org.apache.flink.test.plugin.jar.failure.CustomEnricher at jar:file:/path/to/flink/plugins/failure-enrichment/flink-test-plugin.jar!/org/apache/flink/test/plugin/jar/failure/CustomEnricher.class

此外,还可以查询 JobManager 的 REST API 来查找 failureLabels 字段:

  1. "failureLabels": {
  2. "labelKey": "labelValue"
  3. }