Custom Operator Plugins

The operator provides a customizable platform for Flink resource management. Users can develop plugins to tailor the operator behaviour to their own needs.

FlinkResourceValidator, an interface for validating the resources of FlinkDeployment and FlinkSessionJob, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceValidator and make sure to retain the service definition in META-INF/services. The following steps demonstrate how to develop and use a custom validator.

  1. Implement FlinkResourceValidator interface:

    1. package org.apache.flink.kubernetes.operator.validation;
    2. import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
    3. import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
    4. import java.util.Optional;
    5. /** Custom validator implementation of {@link FlinkResourceValidator}. */
    6. public class CustomValidator implements FlinkResourceValidator {
    7. @Override
    8. public Optional<String> validateDeployment(FlinkDeployment deployment) {
    9. if (deployment.getSpec().getFlinkVersion() == null) {
    10. return Optional.of("Flink Version must be defined.");
    11. }
    12. return Optional.empty();
    13. }
    14. @Override
    15. public Optional<String> validateSessionJob(
    16. FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
    17. if (sessionJob.getSpec().getJob() == null) {
    18. return Optional.of("The job spec should not be empty");
    19. }
    20. return Optional.empty();
    21. }
    22. }
  2. Create service definition file org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator in META-INF/services. With custom FlinkResourceValidator implementation, the service definition describes as follows:

    1. org.apache.flink.kubernetes.operator.validation.CustomValidator
  3. Use the Maven tool to package the project and generate the custom validator JAR.

  4. Create Dockerfile to build a custom image from the apache/flink-kubernetes-operator official image and copy the generated JAR to custom validator plugin directory. /opt/flink/plugins is the value of FLINK_PLUGINS_DIR environment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under /opt/flink/plugins is as follows:

    1. /opt/flink/plugins
    2. ├── custom-validator
    3. ├── custom-validator.jar
    4. └── ...

    With the custom validator directory location, the Dockerfile is defined as follows:

    1. FROM apache/flink-kubernetes-operator
    2. ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
    3. ENV CUSTOM_VALIDATOR_DIR=custom-validator
    4. RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR
    5. COPY custom-validator.jar $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR/
  5. Install the flink-kubernetes-operator helm chart with the custom image and verify the deploy/flink-kubernetes-operator log has:

    1. 2022-05-04 14:01:46,551 o.a.f.k.o.u.FlinkUtils [INFO ] Discovered resource validator from plugin directory[/opt/flink/plugins]: org.apache.flink.kubernetes.operator.validation.CustomValidator.

The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator. This feature enables tighter integration with the user’s own data platform.

By implementing the FlinkResourceListener interface users can listen to both events and status updates per resource type (FlinkDeployment / FlinkSessionJob). These methods will be called after the respective events have been triggered by the system. Using the context provided on each listener method users can also get access to the related Flink resource and the KubernetesClient itself in order to trigger any further events etc on demand.

Similar to custom validator implementations, resource listeners are loaded via the Flink Plugins mechanism.

In order to enable your custom FlinkResourceListener you need to:

  1. Implement the interface
  2. Add your listener class to org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener in META-INF/services
  3. Package your JAR and add it to the plugins directory of your operator image (/opt/flink/plugins)

Additional Dependencies

In some cases, users may need to add required dependencies onto the operator classpath.

When building the custom image, The additional dependencies shall be copied to /opt/flink/operator-lib with the environment variable: OPERATOR_LIB That folder is added to classpath upon initialization.

  1. FROM apache/flink-kubernetes-operator
  2. ARG ARTIFACT1=custom-artifact1.jar
  3. ARG ARTIFACT2=custom-artifact2.jar
  4. COPY target/$ARTIFACT1 $OPERATOR_LIB
  5. COPY target/$ARTIFACT2 $OPERATOR_LIB

FlinkResourceMutator, an interface for ,mutating the resources of FlinkDeployment and FlinkSessionJob, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceMutator and make sure to retain the service definition in META-INF/services. The following steps demonstrate how to develop and use a custom mutator.

  1. Implement FlinkResourceMutator interface:

    1. package org.apache.flink.mutator;
    2. import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
    3. import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
    4. import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
    5. import java.util.Optional;
    6. /** Custom Flink Mutator. */
    7. public class CustomFlinkMutator implements FlinkResourceMutator {
    8. @Override
    9. public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
    10. return deployment;
    11. }
    12. @Override
    13. public FlinkSessionJob mutateSessionJob(
    14. FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
    15. return sessionJob;
    16. }

}

  1. ```
  1. Create service definition file org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator in META-INF/services. With custom FlinkResourceMutator implementation, the service definition describes as follows:

    1. org.apache.flink.mutator.CustomFlinkMutator
  2. Use the Maven tool to package the project and generate the custom mutator JAR.

  3. Create Dockerfile to build a custom image from the apache/flink-kubernetes-operator official image and copy the generated JAR to custom mutator plugin directory. /opt/flink/plugins is the value of FLINK_PLUGINS_DIR environment variable in the flink-kubernetes-operator helm chart. The structure of custom mutator directory under /opt/flink/plugins is as follows:

    1. /opt/flink/plugins
    2. ├── custom-mutator
    3. ├── custom-mutator.jar
    4. └── ...

    With the custom mutator directory location, the Dockerfile is defined as follows:

    1. FROM apache/flink-kubernetes-operator
    2. ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
    3. ENV CUSTOM_MUTATOR_DIR=custom-mutator
    4. RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_MUTATOR_DIR
    5. COPY custom-mutator.jar $FLINK_PLUGINS_DIR/$CUSTOM_MUTATOR_DIR/
  4. Install the flink-kubernetes-operator helm chart with the custom image and verify the deploy/flink-kubernetes-operator log has:

    1. 2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator.