Custom failure enrichers

Flink provides a pluggable interface for users to register their custom logic and enrich failures with extra metadata labels (string key-value pairs). This enables users to implement their own failure enrichment plugins to categorize job failures, expose custom metrics, or make calls to external notification systems.

FailureEnrichers are triggered every time an exception is reported at runtime by the JobManager. Every FailureEnricher may asynchronously return labels associated with the failure that are then exposed via the JobManager’s REST API (e.g., a ’type:System’ label implying the failure is categorized as a system error).

Implement a plugin for your custom enricher

To implement a custom FailureEnricher plugin, you need to:

  • Add your own FailureEnricher by implementing the FailureEnricher interface.

  • Add your own FailureEnricherFactory by implementing the FailureEnricherFactory interface.

  • Add a service entry. Create a file META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory which contains the class name of your failure enricher factory class (see Java Service Loader docs for more details).

Then, create a jar which includes your FailureEnricher, FailureEnricherFactory, META-INF/services/ and all external dependencies. Make a directory in plugins/ of your Flink distribution with an arbitrary name, e.g. “failure-enrichment”, and put the jar into this directory. See Flink Plugin for more details.

Note that every FailureEnricher should have defined a set of output keys that may be associated with values. This set of keys has to be unique otherwise all enrichers with overlapping keys will be ignored.

FailureEnricherFactory example:

  1. package org.apache.flink.test.plugin.jar.failure;
  2. public class TestFailureEnricherFactory implements FailureEnricherFactory {
  3. @Override
  4. public FailureEnricher createFailureEnricher(Configuration conf) {
  5. return new CustomEnricher();
  6. }
  7. }

FailureEnricher example:

  1. package org.apache.flink.test.plugin.jar.failure;
  2. public class CustomEnricher implements FailureEnricher {
  3. private final Set<String> outputKeys;
  4. public CustomEnricher() {
  5. this.outputKeys = Collections.singleton("labelKey");
  6. }
  7. @Override
  8. public Set<String> getOutputKeys() {
  9. return outputKeys;
  10. }
  11. @Override
  12. public CompletableFuture<Map<String, String>> processFailure(
  13. Throwable cause, Context context) {
  14. return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue"));
  15. }
  16. }

Configuration

The JobManager loads FailureEnricher plugins at startup. To make sure your FailureEnrichers are loaded all class names should be defined as part of jobmanager.failure-enrichers configuration. If this configuration is empty, NO enrichers will be started. Example:

  1. jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.CustomEnricher

Validation

To validate that your FailureEnricher is loaded, you can check the JobManager logs for the following line:

  1. Found failure enricher org.apache.flink.test.plugin.jar.failure.CustomEnricher at jar:file:/path/to/flink/plugins/failure-enrichment/flink-test-plugin.jar!/org/apache/flink/test/plugin/jar/failure/CustomEnricher.class

Moreover, you can query the JobManager’s REST API looking for the failureLabels field:

  1. "failureLabels": {
  2. "labelKey": "labelValue"
  3. }