Configuration

Specifying Operator Configuration

The operator allows users to specify default configuration that will be shared by the Flink operator itself and the Flink deployments.

These configuration files are mounted externally via ConfigMaps. The Configuration files with default values are shipped in the Helm chart. It is recommended to review and adjust them if needed in the values.yaml file before deploying the Operator in production environments.

To append to the default configuration, define the flink-conf.yaml key in the defaultConfiguration section of the Helm values.yaml file:

  1. defaultConfiguration:
  2. create: true
  3. # Set append to false to replace configuration files
  4. append: true
  5. flink-conf.yaml: |+
  6. # Flink Config Overrides
  7. kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
  8. kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
  9. kubernetes.operator.reconcile.interval: 15 s
  10. kubernetes.operator.observer.progress-check.interval: 5 s

To learn more about metrics and logging configuration please refer to the dedicated docs page.

The operator also supports default configuration overrides for selected Flink versions and namespaces. This can be important if some behaviour changed across Flink versions or we want to treat certain namespaces differently (such as reconcile it more or less frequently etc).

  1. # Flink Version specific defaults
  2. kubernetes.operator.default-configuration.flink-version.v1_17.k1: v1
  3. kubernetes.operator.default-configuration.flink-version.v1_17.k2: v2
  4. kubernetes.operator.default-configuration.flink-version.v1_17.k3: v3
  5. # Namespace specific defaults
  6. kubernetes.operator.default-configuration.namespace.ns1.k1: v1
  7. kubernetes.operator.default-configuration.namespace.ns1.k2: v2
  8. kubernetes.operator.default-configuration.namespace.ns2.k1: v1

Flink version specific defaults will have a higher precedence so namespace defaults would be overridden by the same key.

Dynamic Operator Configuration

The Kubernetes operator supports dynamic config changes through the operator ConfigMaps. Dynamic operator configuration is enabled by default, and can be disabled by setting kubernetes.operator.dynamic.config.enabled to false. Time interval for checking dynamic config changes is specified by kubernetes.operator.dynamic.config.check.interval of which default value is 5 minutes.

Verify whether dynamic operator configuration updates is enabled via the deploy/flink-kubernetes-operator log has:

  1. 2022-05-28 13:08:29,222 o.a.f.k.o.c.FlinkConfigManager [INFO ] Enabled dynamic config updates, checking config changes every PT5M

To change config values dynamically the ConfigMap can be directly edited via kubectl patch or kubectl edit command. For example to change the reschedule interval you can override kubernetes.operator.reconcile.interval.

Verify whether the config value of kubernetes.operator.reconcile.interval is updated to 30 seconds via the deploy/flink-kubernetes-operator log has:

  1. 2022-05-28 13:08:30,115 o.a.f.k.o.c.FlinkConfigManager [INFO ] Updating default configuration to {kubernetes.operator.reconcile.interval=PT30S}

Leader Election and High Availability

The operator supports high availability through leader election and standby operator instances. To enable leader election you need to add the following two mandatory operator configuration parameters.

  1. kubernetes.operator.leader-election.enabled: true
  2. kubernetes.operator.leader-election.lease-name: flink-operator-lease

Lease name must be unique in the current lease namespace. For other more advanced config parameters please refer to the configuration reference.

Once you enabled leader election you can increase the replicas for the operator Deployment using the Helm chart to enable high availability.

If replicas value is greater than 1, you can define topologySpreadConstraints via operatorPod.topologySpreadConstraints.

Environment variables

The operator exposes several environment variables which can be used for custom plugins.

NameDescriptionFieldRef
HOST_IPThe host which the pod is deployed onstatus.hostIP
POD_IPPod IPstatus.podIP
POD_NAMEPod Namemetadata.name

Operator Configuration Reference

System Configuration

General operator system configuration. Cannot be overridden on a per-resource basis.

KeyDefaultTypeDescription
kubernetes.operator.dynamic.namespaces.enabled
falseBooleanEnables dynamic change of watched/monitored namespaces.
kubernetes.operator.exception.field.max.length
2048IntegerMaximum length of each exception field including stack trace to be included in CR status error field.
kubernetes.operator.exception.stacktrace.enabled
falseBooleanEnable exception stacktrace to be included in CR status error field.
kubernetes.operator.exception.stacktrace.max.length
2048IntegerMaximum length of stacktrace to be included in CR status error field.
kubernetes.operator.exception.throwable.list.max.count
2IntegerMaximum number of throwable to be included in CR status error field.
kubernetes.operator.flink.client.cancel.timeout
1 minDurationThe timeout for the reconciler to wait for flink to cancel job.
kubernetes.operator.flink.client.timeout
10 sDurationThe timeout for the observer to wait the flink rest client to return.
kubernetes.operator.leader-election.enabled
falseBooleanEnable leader election for the operator to allow running standby instances.
kubernetes.operator.leader-election.lease-duration
15 sDurationLeader election lease duration.
kubernetes.operator.leader-election.lease-name
(none)StringLeader election lease name, must be unique for leases in the same namespace.
kubernetes.operator.leader-election.renew-deadline
10 sDurationLeader election renew deadline.
kubernetes.operator.leader-election.retry-period
2 sDurationLeader election retry period.
kubernetes.operator.reconcile.interval
1 minDurationThe interval for the controller to reschedule the reconcile process.
kubernetes.operator.reconcile.parallelism
200IntegerThe maximum number of threads running the reconciliation loop. Use -1 for infinite.
kubernetes.operator.resource.cleanup.timeout
1 minDurationThe timeout for the resource clean up to wait for flink to shutdown cluster.
kubernetes.operator.retry.initial.interval
5 sDurationInitial interval of automatic reconcile retries on recoverable errors.
kubernetes.operator.retry.interval.multiplier
2.0DoubleInterval multiplier of automatic reconcile retries on recoverable errors.
kubernetes.operator.retry.max.attempts
10IntegerMax attempts of automatic reconcile retries on recoverable errors.
kubernetes.operator.user.artifacts.base.dir
“/opt/flink/artifacts”StringThe base dir to put the session job artifacts.
kubernetes.operator.watched.namespaces
“JOSDK_ALL_NAMESPACES”StringComma separated list of namespaces the operator monitors for custom resources.

Resource/User Configuration

These options can be configured on both an operator and a per-resource level. When set under spec.flinkConfiguration for the Flink resources it will override the default value provided in the operator default configuration (flink-conf.yaml).

KeyDefaultTypeDescription
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled
falseBooleanWhether to enable checkpoint progress health check for clusters.
kubernetes.operator.cluster.health-check.checkpoint-progress.window
5 minDurationIf no checkpoints are completed within the defined time window, the job is considered unhealthy. This must be bigger than checkpointing interval.
kubernetes.operator.cluster.health-check.enabled
falseBooleanWhether to enable health check for clusters.
kubernetes.operator.cluster.health-check.restarts.threshold
64IntegerThe threshold which is checked against job restart count within a configured window. If the restart count is reaching the threshold then full cluster restart is initiated.
kubernetes.operator.cluster.health-check.restarts.window
2 minDurationThe duration of the time window where job restart count measured.
kubernetes.operator.deployment.readiness.timeout
5 minDurationThe timeout for deployments to become ready/stable before being rolled back if rollback is enabled.
kubernetes.operator.deployment.rollback.enabled
falseBooleanWhether to enable rolling back failed deployment upgrades.
kubernetes.operator.exception.label.mapper
MapKey-Value pair where key is the REGEX to filter through the exception messages and value is the string to be included in CR status error label field if the REGEX matches. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.
kubernetes.operator.jm-deployment-recovery.enabled
trueBooleanWhether to enable recovery of missing/deleted jobmanager deployments.
kubernetes.operator.jm-deployment.shutdown-ttl
86400000 msDurationTime after which jobmanager pods of terminal application deployments are shut down.
kubernetes.operator.jm-deployment.startup.probe.enabled
trueBooleanEnable job manager startup probe to allow detecting when the jobmanager could not submit the job.
kubernetes.operator.job.restart.failed
falseBooleanWhether to restart failed jobs.
kubernetes.operator.job.savepoint-on-deletion
falseBooleanIndicate whether a savepoint must be taken when deleting a FlinkDeployment or FlinkSessionJob.
kubernetes.operator.job.upgrade.ignore-pending-savepoint
falseBooleanWhether to ignore pending savepoint during job upgrade.
kubernetes.operator.job.upgrade.inplace-scaling.enabled
trueBooleanWhether to enable inplace scaling for Flink 1.18+ using the resource requirements API. On failure or earlier Flink versions it falls back to regular full redeployment.
kubernetes.operator.job.upgrade.last-state-fallback.enabled
trueBooleanEnables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.
kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age
(none)DurationMax allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.
kubernetes.operator.periodic.savepoint.interval
0 msDurationInterval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.
kubernetes.operator.pod-template.merge-arrays-by-name
falseBooleanConfigure the array merge behaviour during pod merging. Arrays can be either merged by position or name matching.
kubernetes.operator.savepoint.cleanup.enabled
trueBooleanWhether to enable clean up of savepoint history.
kubernetes.operator.savepoint.format.type
CANONICAL

Enum

Type of the binary format in which a savepoint should be taken.

Possible values:
  • “CANONICAL”: A canonical, common for all state backends format. It lets you switch state backends.
  • “NATIVE”: A format specific for the chosen state backend, in its native binary format. Might be faster to take and restore from than the canonical one.
kubernetes.operator.savepoint.history.max.age
86400000 msDurationMaximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.
kubernetes.operator.savepoint.history.max.count
10IntegerMaximum number of savepoint history entries to retain.
kubernetes.operator.savepoint.trigger.grace-period
1 minDurationThe interval before a savepoint trigger attempt is marked as unsuccessful.
kubernetes.operator.user.artifacts.http.header
(none)MapCustom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.

Autoscaler Configuration

Like other resource options these can be configured on both an operator and a per-resource level. When set under spec.flinkConfiguration for the Flink resources it will override the default value provided in the operator default configuration (flink-conf.yaml).

KeyDefaultTypeDescription
kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold
5 minDurationLag threshold which will prevent unnecessary scalings while removing the pending messages responsible for the lag.
kubernetes.operator.job.autoscaler.catch-up.duration
5 minDurationThe target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.
kubernetes.operator.job.autoscaler.enabled
falseBooleanEnable job autoscaler module.
kubernetes.operator.job.autoscaler.history.max.age
86400000 msDurationMaximum age for past scaling decisions to retain.
kubernetes.operator.job.autoscaler.history.max.count
3IntegerMaximum number of past scaling decisions to retain per vertex.
kubernetes.operator.job.autoscaler.metrics.busy-time.aggregator
MAX

Enum

Metric aggregator to use for busyTime metrics. This affects how true processing/output rate will be computed. Using max allows us to handle jobs with data skew more robustly, while avg may provide better stability when we know that the load distribution is even.

Possible values:
  • “AVG”
  • “MAX”
  • “MIN”
kubernetes.operator.job.autoscaler.metrics.window
10 minDurationScaling metrics aggregation window size.
kubernetes.operator.job.autoscaler.restart.time
3 minDurationExpected restart time to be used until the operator can determine it reliably from history.
kubernetes.operator.job.autoscaler.scale-down.max-factor
0.6DoubleMax scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.
kubernetes.operator.job.autoscaler.scale-up.grace-period
1 hDurationDuration in which no scale down of a vertex is allowed after it has been scaled up.
kubernetes.operator.job.autoscaler.scale-up.max-factor
100000.0DoubleMax scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.
kubernetes.operator.job.autoscaler.scaling.effectiveness.detection.enabled
falseBooleanWhether to enable detection of ineffective scaling operations and allowing the autoscaler to block further scale ups.
kubernetes.operator.job.autoscaler.scaling.effectiveness.threshold
0.1DoubleProcessing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective.
kubernetes.operator.job.autoscaler.scaling.enabled
trueBooleanEnable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.
kubernetes.operator.job.autoscaler.stabilization.interval
5 minDurationStabilization period in which no new scaling will be executed
kubernetes.operator.job.autoscaler.target.utilization
0.7DoubleTarget vertex utilization
kubernetes.operator.job.autoscaler.target.utilization.boundary
0.4DoubleTarget vertex utilization boundary. Scaling won’t be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]
kubernetes.operator.job.autoscaler.vertex.exclude.ids
List<String>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.
kubernetes.operator.job.autoscaler.vertex.max-parallelism
200IntegerThe maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.
kubernetes.operator.job.autoscaler.vertex.min-parallelism
1IntegerThe minimum parallelism the autoscaler can use.

System Metrics Configuration

Operator system metrics configuration. Cannot be overridden on a per-resource basis.

KeyDefaultTypeDescription
kubernetes.operator.josdk.metrics.enabled
trueBooleanEnable forwarding of Java Operator SDK metrics to the Flink metric registry.
kubernetes.operator.jvm.metrics.enabled
trueBooleanEnable Kubernetes Operator JVM metrics.
kubernetes.operator.kubernetes.client.metrics.enabled
trueBooleanEnable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.
kubernetes.operator.kubernetes.client.metrics.http.response.code.groups.enabled
falseBooleanEnable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.
kubernetes.operator.metrics.histogram.sample.size
1000IntegerDefines the number of measured samples when calculating statistics.
kubernetes.operator.metrics.scope.k8soperator.resource
“<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>”StringDefines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.
kubernetes.operator.metrics.scope.k8soperator.resourcens
“<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>”StringDefines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.
kubernetes.operator.metrics.scope.k8soperator.system
“<host>.k8soperator.<namespace>.<name>.system”StringDefines the scope format string that is applied to all metrics scoped to the kubernetes operator.
kubernetes.operator.resource.lifecycle.metrics.enabled
trueBooleanEnable resource lifecycle state metrics. This enables both state and transition counts/histograms.
kubernetes.operator.resource.lifecycle.namespace.histograms.enabled
trueBooleanIn addition to the system level histograms, enable per namespace tracking of state and transition times.
kubernetes.operator.resource.metrics.enabled
trueBooleanEnables metrics for FlinkDeployment and FlinkSessionJob custom resources.

Advanced System Configuration

Advanced operator system configuration. Cannot be overridden on a per-resource basis.

KeyDefaultTypeDescription
kubernetes.operator.config.cache.size
1000IntegerMax config cache size.
kubernetes.operator.config.cache.timeout
10 minDurationExpiration time for cached configs.
kubernetes.operator.dynamic.config.check.interval
5 minDurationTime interval for checking config changes.
kubernetes.operator.dynamic.config.enabled
trueBooleanWhether to enable on-the-fly config changes through the operator configmap.
kubernetes.operator.health.canary.resource.timeout
1 minDurationAllowed max time between spec update and reconciliation for canary resources.
kubernetes.operator.health.probe.enabled
trueBooleanEnables health probe for the kubernetes operator.
kubernetes.operator.health.probe.port
8085IntegerThe port the health probe will use to expose the status.
kubernetes.operator.label.selector
(none)StringLabel selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.
kubernetes.operator.observer.progress-check.interval
10 sDurationThe interval for observing status for in-progress operations such as deployment and savepoints.
kubernetes.operator.observer.rest-ready.delay
10 sDurationFinal delay before deployment is marked ready after port becomes accessible.
kubernetes.operator.resource.deletion.propagation
Foreground

Enum

JM/TM Deployment deletion propagation.

Possible values:
  • “Orphan”
  • “Background”
  • “Foreground”
kubernetes.operator.savepoint.history.max.age.threshold
(none)DurationMaximum age threshold for savepoint history entries to retain.
kubernetes.operator.savepoint.history.max.count.threshold
(none)IntegerMaximum number threshold of savepoint history entries to retain.
kubernetes.operator.startup.stop-on-informer-error
trueBooleanWhether informer errors should stop operator startup. If false, the startup will ignore recoverable errors, caused for example by RBAC issues and will retry periodically.
kubernetes.operator.termination.timeout
10 sDurationOperator shutdown timeout before reconciliation threads are killed.