Kubernetes Setup

This page describes how to deploy a Flink job and session cluster on Kubernetes.

Info This page describes deploying a standalone Flink session on top of Kubernetes. For information on native Kubernetes deployments read here.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster.If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster.Each job needs to be submitted to the cluster after the cluster has been deployed.

A basic Flink session cluster deployment in Kubernetes has three components:

  • a Deployment/Job which runs the JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Using the resource definitions for a session cluster, launch the cluster with the kubectl command:

  1. kubectl create -f flink-configuration-configmap.yaml
  2. kubectl create -f jobmanager-service.yaml
  3. kubectl create -f jobmanager-deployment.yaml
  4. kubectl create -f taskmanager-deployment.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

You can then access the Flink UI via different ways:

  1. ./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
  • Create a NodePort service on the rest service of jobmanager:
    • Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
    • Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://: in your browser.
    • Similarly to port-forward solution, you could also use the following command below to submit jobs to the cluster:
  1. ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

In order to terminate the Flink session cluster, use kubectl:

  1. kubectl delete -f jobmanager-deployment.yaml
  2. kubectl delete -f taskmanager-deployment.yaml
  3. kubectl delete -f jobmanager-service.yaml
  4. kubectl delete -f flink-configuration-configmap.yaml

A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.

Creating the job-specific image

The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started.Therefore, one needs to build a dedicated container image for every job.Please follow these instructions to build the Docker image.

In order to deploy the a job cluster on Kubernetes please follow these instructions.

Advanced Cluster Deployment

An early version of a Flink Helm chart is available on GitHub.

Appendix

Session cluster resource definitions

The Deployment definitions use the pre-built image flink:latest which can be found on Docker Hub.The image is built from this Github repository.

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 1
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. jobmanager.heap.size: 1024m
  15. taskmanager.memory.process.size: 1024m
  16. log4j.properties: |+
  17. log4j.rootLogger=INFO, file
  18. log4j.logger.akka=INFO
  19. log4j.logger.org.apache.kafka=INFO
  20. log4j.logger.org.apache.hadoop=INFO
  21. log4j.logger.org.apache.zookeeper=INFO
  22. log4j.appender.file=org.apache.log4j.FileAppender
  23. log4j.appender.file.file=${log.file}
  24. log4j.appender.file.layout=org.apache.log4j.PatternLayout
  25. log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  26. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: flink:latest
  20. workingDir: /opt/flink
  21. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
  22. while :;
  23. do
  24. if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
  25. then tail -f -n +1 log/*jobmanager*.log;
  26. fi;
  27. done"]
  28. ports:
  29. - containerPort: 6123
  30. name: rpc
  31. - containerPort: 6124
  32. name: blob
  33. - containerPort: 8081
  34. name: ui
  35. livenessProbe:
  36. tcpSocket:
  37. port: 6123
  38. initialDelaySeconds: 30
  39. periodSeconds: 60
  40. volumeMounts:
  41. - name: flink-config-volume
  42. mountPath: /opt/flink/conf
  43. securityContext:
  44. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  45. volumes:
  46. - name: flink-config-volume
  47. configMap:
  48. name: flink-config
  49. items:
  50. - key: flink-conf.yaml
  51. path: flink-conf.yaml
  52. - key: log4j.properties
  53. path: log4j.properties

taskmanager-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: flink:latest
  20. workingDir: /opt/flink
  21. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
  22. while :;
  23. do
  24. if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
  25. then tail -f -n +1 log/*taskmanager*.log;
  26. fi;
  27. done"]
  28. ports:
  29. - containerPort: 6122
  30. name: rpc
  31. livenessProbe:
  32. tcpSocket:
  33. port: 6122
  34. initialDelaySeconds: 30
  35. periodSeconds: 60
  36. volumeMounts:
  37. - name: flink-config-volume
  38. mountPath: /opt/flink/conf/
  39. securityContext:
  40. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  41. volumes:
  42. - name: flink-config-volume
  43. configMap:
  44. name: flink-config
  45. items:
  46. - key: flink-conf.yaml
  47. path: flink-conf.yaml
  48. - key: log4j.properties
  49. path: log4j.properties

jobmanager-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob
  11. port: 6124
  12. - name: ui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. selector:
  12. app: flink
  13. component: jobmanager