Flink on Volcano

最近更新于 Jul 31, 2021

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

前提条件

需要已经部署创建好CCE集群,集群下至少有一个可用节点,集群内节点已经绑定了弹性公网IP、kubectl命令行工具。

部署流程[1]

1.Download

为了运行Flink,需要java8或11的环境,使用如下的指令确定java的版本。

  1. java -version

下载软件包并且进入目录下。

  1. $ wget https://www.apache.org/dyn/closer.lua/flink/flink-1.12.2/flink-1.12.2-src.tgz
  2. $ cd flink-1.12.2
2.Start a Cluster

运行脚本完成flink在集群上的部署。

  1. $ ./bin/start-cluster.sh
3.Submit a job

随后可以使用如下的指令提交作业。

  1. $ ./bin/flink run examples/streaming/WordCount.jar
  2. $ tail log/flink-*-taskexecutor-*.out
1.部署组件

Flink cluster的部署需要创建两个deploy、一个service和一个configmap。调度策略采用volcano。flink-configuration-configmap.yaml内容如下

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 1600m
  16. taskmanager.memory.process.size: 1728m
  17. parallelism.default: 2
  18. log4j-console.properties: |+
  19. # This affects logging for both user code and Flink
  20. rootLogger.level = INFO
  21. rootLogger.appenderRef.console.ref = ConsoleAppender
  22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  23. # Uncomment this if you want to _only_ change Flink's logging
  24. #logger.flink.name = org.apache.flink
  25. #logger.flink.level = INFO
  26. # The following lines keep the log level of common libraries/connectors on
  27. # log level INFO. The root logger does not override this. You have to manually
  28. # change the log levels here.
  29. logger.akka.name = akka
  30. logger.akka.level = INFO
  31. logger.kafka.name= org.apache.kafka
  32. logger.kafka.level = INFO
  33. logger.hadoop.name = org.apache.hadoop
  34. logger.hadoop.level = INFO
  35. logger.zookeeper.name = org.apache.zookeeper
  36. logger.zookeeper.level = INFO
  37. # Log all infos to the console
  38. appender.console.name = ConsoleAppender
  39. appender.console.type = CONSOLE
  40. appender.console.layout.type = PatternLayout
  41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  42. # Log all infos in the given rolling file
  43. appender.rolling.name = RollingFileAppender
  44. appender.rolling.type = RollingFile
  45. appender.rolling.append = false
  46. appender.rolling.fileName = ${sys:log.file}
  47. appender.rolling.filePattern = ${sys:log.file}.%i
  48. appender.rolling.layout.type = PatternLayout
  49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  50. appender.rolling.policies.type = Policies
  51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  52. appender.rolling.policies.size.size=100MB
  53. appender.rolling.strategy.type = DefaultRolloverStrategy
  54. appender.rolling.strategy.max = 10
  55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  56. logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
  57. logger.netty.level = OFF

service用来提供JobManager的REST和UI端口的服务,jobmanager-service.yaml内容如下

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-session-deployment.yaml内容如下

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: flink:1.11.0-scala_2.11
  20. args: ["jobmanager"]
  21. ports:
  22. - containerPort: 6123
  23. name: rpc
  24. - containerPort: 6124
  25. name: blob-server
  26. - containerPort: 8081
  27. name: webui
  28. livenessProbe:
  29. tcpSocket:
  30. port: 6123
  31. initialDelaySeconds: 30
  32. periodSeconds: 60
  33. volumeMounts:
  34. - name: flink-config-volume
  35. mountPath: /opt/flink/conf
  36. securityContext:
  37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  38. volumes:
  39. - name: flink-config-volume
  40. configMap:
  41. name: flink-config
  42. items:
  43. - key: flink-conf.yaml
  44. path: flink-conf.yaml
  45. - key: log4j-console.properties
  46. path: log4j-console.properties

taskmanager-session-deployment.yaml内容如下

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: flink:1.11.0-scala_2.11
  20. args: ["taskmanager"]
  21. ports:
  22. - containerPort: 6122
  23. name: rpc
  24. - containerPort: 6125
  25. name: query-state
  26. livenessProbe:
  27. tcpSocket:
  28. port: 6122
  29. initialDelaySeconds: 30
  30. periodSeconds: 60
  31. volumeMounts:
  32. - name: flink-config-volume
  33. mountPath: /opt/flink/conf/
  34. securityContext:
  35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  36. volumes:
  37. - name: flink-config-volume
  38. configMap:
  39. name: flink-config
  40. items:
  41. - key: flink-conf.yaml
  42. path: flink-conf.yaml
  43. - key: log4j-console.properties
  44. path: log4j-console.properties

在集群节点创建好上面四个yaml配置文件,使用如下指令进行部署。

  1. kubectl create -f flink-configuration-configmap.yaml
  2. kubectl create -f jobmanager-service.yaml
  3. kubectl create -f jobmanager-session-deployment.yaml
  4. kubectl create -f taskmanager-session-deployment.yaml

创建成功后查询:

  1. kubectl get cm| grep flink
  2. kubectl get svc | grep flink
  3. kubectl get pod -owide | grep Flink
2.对外发布服务[3]

创建好flink负载之后,需要像外部发布服务。

  • 若使用华为云CCE进行测试,进入CCE的”工作负载-无状态负载”页面。选择flink-jobmanager,单击”访问方式”。
  • 点击“添加service”,选择节点访问,输入容器端口位8081。
  • 点击CCE中的网络管理,能够看到刚才我们添加的service,访问对外发布的链接。
  • 进入flink的Dashboard页面,点击submit new job提交任务官方的WordCount作业。目录为flink-1.12.2/examples/streaming/WordCount.jar。