Kubernetes HA Services

Flink’s Kubernetes HA services use Kubernetes for high availability services.

Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration

Prerequisites

In order to use Flink’s Kubernetes HA services you must fulfill the following prerequisites:

Configuration

In order to start an HA-cluster you have to configure the following configuration keys:

  • high-availability (required): The high-availability option has to be set to KubernetesHaServicesFactory.
  1. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  • high-availability.storageDir (required): JobManager metadata is persisted in the file system high-availability.storageDir and only a pointer to this state is stored in Kubernetes.
  1. high-availability.storageDir: s3:///flink/recovery

The storageDir stores all metadata needed to recover a JobManager failure.

  • kubernetes.cluster-id (required): In order to identify the Flink cluster, you have to specify a kubernetes.cluster-id.
  1. kubernetes.cluster-id: cluster1337

Example configuration

Configure high availability mode in conf/flink-conf.yaml:

  1. kubernetes.cluster-id: <cluster-id>
  2. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
  3. high-availability.storageDir: hdfs:///flink/recovery

High availability data clean up

To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deployment <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.