使用索引作业完成静态工作分配下的并行处理

特性状态: Kubernetes v1.24 [stable]

在此示例中,你将运行一个使用多个并行工作进程的 Kubernetes Job。 每个 worker 都是在自己的 Pod 中运行的不同容器。 Pod 具有控制平面自动设置的索引编号(index number), 这些编号使得每个 Pod 能识别出要处理整个任务的哪个部分。

Pod 索引在注解 batch.kubernetes.io/job-completion-index 中呈现,具体表示为一个十进制值字符串。 为了让容器化的任务进程获得此索引,你可以使用 downward API 机制发布注解的值。为方便起见, 控制平面自动设置 Downward API 以在 JOB_COMPLETION_INDEX 环境变量中公开索引。

以下是此示例中步骤的概述:

  1. 定义使用带索引完成信息的 Job 清单。 Downward API 使你可以将 Pod 索引注解作为环境变量或文件传递给容器。
  2. 根据该清单启动一个带索引(Indexed)的 Job

准备开始

你应该已经熟悉 Job 的基本的、非并行的用法。

你必须拥有一个 Kubernetes 的集群,同时你必须配置 kubectl 命令行工具与你的集群通信。 建议在至少有两个不作为控制平面主机的节点的集群上运行本教程。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面的 Kubernetes 练习环境之一:

你的 Kubernetes 服务器版本必须不低于版本 v1.21. 要获知版本信息,请输入 kubectl version.

选择一种方法

要从工作程序访问工作项,你有几个选项:

  1. 读取 JOB_COMPLETION_INDEX 环境变量。Job 控制器自动将此变量链接到包含完成索引的注解。
  2. 读取包含完整索引的文件。
  3. 假设你无法修改程序,你可以使用脚本包装它, 该脚本使用上述任意方法读取索引并将其转换为程序可以用作输入的内容。

对于此示例,假设你选择了选项 3 并且想要运行 rev 实用程序。 这个程序接受一个文件作为参数并按逆序打印其内容。

  1. rev data.txt

你将使用 busybox 容器镜像中的 rev 工具。

由于这只是一个例子,每个 Pod 只做一小部分工作(反转一个短字符串)。 例如,在实际工作负载中,你可能会创建一个表示基于场景数据制作 60 秒视频任务的 Job 。 此视频渲染 Job 中的每个工作项都将渲染该视频剪辑的特定帧。 索引完成意味着 Job 中的每个 Pod 都知道通过从剪辑开始计算帧数,来确定渲染和发布哪一帧。

定义索引作业

这是一个使用 Indexed 完成模式的示例 Job 清单:

application/job/indexed-job.yaml使用索引作业完成静态工作分配下的并行处理 - 图1

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: 'indexed-job'
  5. spec:
  6. completions: 5
  7. parallelism: 3
  8. completionMode: Indexed
  9. template:
  10. spec:
  11. restartPolicy: Never
  12. initContainers:
  13. - name: 'input'
  14. image: 'docker.io/library/bash'
  15. command:
  16. - "bash"
  17. - "-c"
  18. - |
  19. items=(foo bar baz qux xyz)
  20. echo ${items[$JOB_COMPLETION_INDEX]} > /input/data.txt
  21. volumeMounts:
  22. - mountPath: /input
  23. name: input
  24. containers:
  25. - name: 'worker'
  26. image: 'docker.io/library/busybox'
  27. command:
  28. - "rev"
  29. - "/input/data.txt"
  30. volumeMounts:
  31. - mountPath: /input
  32. name: input
  33. volumes:
  34. - name: input
  35. emptyDir: {}

在上面的示例中,你使用 Job 控制器为所有容器设置的内置 JOB_COMPLETION_INDEX 环境变量。 Init 容器 将索引映射到一个静态值,并将其写入一个文件,该文件通过 emptyDir 卷 与运行 worker 的容器共享。或者,你可以 通过 Downward API 定义自己的环境变量 将索引发布到容器。你还可以选择从 包含 ConfigMap 的环境变量或文件 加载值列表。

或者也可以直接 使用 Downward API 将注解值作为卷文件传递, 如下例所示:

application/job/indexed-job-vol.yaml使用索引作业完成静态工作分配下的并行处理 - 图2

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: 'indexed-job'
  5. spec:
  6. completions: 5
  7. parallelism: 3
  8. completionMode: Indexed
  9. template:
  10. spec:
  11. restartPolicy: Never
  12. containers:
  13. - name: 'worker'
  14. image: 'docker.io/library/busybox'
  15. command:
  16. - "rev"
  17. - "/input/data.txt"
  18. volumeMounts:
  19. - mountPath: /input
  20. name: input
  21. volumes:
  22. - name: input
  23. downwardAPI:
  24. items:
  25. - path: "data.txt"
  26. fieldRef:
  27. fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']

执行 Job {running-the-job}

现在执行 Job:

  1. # 使用第一种方法(依赖于 $JOB_COMPLETION_INDEX)
  2. kubectl apply -f https://kubernetes.io/examples/application/job/indexed-job.yaml

当你创建此 Job 时,控制平面会创建一系列 Pod,你指定的每个索引都会运行一个 Pod。 .spec.parallelism 的值决定了一次可以运行多少个 Pod, 而 .spec.completions 决定了 Job 总共创建了多少个 Pod。

因为 .spec.parallelism 小于 .spec.completions, 所以控制平面在启动更多 Pod 之前,将等待第一批的某些 Pod 完成。

你可以等待 Job 成功,等待时间可以设置超时限制:

  1. # 状况名称的检查不区分大小写
  2. kubectl wait --for=condition=complete --timeout=300s job/indexed-job

现在,描述 Job 并检查它是否成功。

  1. kubectl describe jobs/indexed-job

输出类似于:

  1. Name: indexed-job
  2. Namespace: default
  3. Selector: controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
  4. Labels: controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
  5. job-name=indexed-job
  6. Annotations: <none>
  7. Parallelism: 3
  8. Completions: 5
  9. Start Time: Thu, 11 Mar 2021 15:47:34 +0000
  10. Pods Statuses: 2 Running / 3 Succeeded / 0 Failed
  11. Completed Indexes: 0-2
  12. Pod Template:
  13. Labels: controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
  14. job-name=indexed-job
  15. Init Containers:
  16. input:
  17. Image: docker.io/library/bash
  18. Port: <none>
  19. Host Port: <none>
  20. Command:
  21. bash
  22. -c
  23. items=(foo bar baz qux xyz)
  24. echo ${items[$JOB_COMPLETION_INDEX]} > /input/data.txt
  25. Environment: <none>
  26. Mounts:
  27. /input from input (rw)
  28. Containers:
  29. worker:
  30. Image: docker.io/library/busybox
  31. Port: <none>
  32. Host Port: <none>
  33. Command:
  34. rev
  35. /input/data.txt
  36. Environment: <none>
  37. Mounts:
  38. /input from input (rw)
  39. Volumes:
  40. input:
  41. Type: EmptyDir (a temporary directory that shares a pod's lifetime)
  42. Medium:
  43. SizeLimit: <unset>
  44. Events:
  45. Type Reason Age From Message
  46. ---- ------ ---- ---- -------
  47. Normal SuccessfulCreate 4s job-controller Created pod: indexed-job-njkjj
  48. Normal SuccessfulCreate 4s job-controller Created pod: indexed-job-9kd4h
  49. Normal SuccessfulCreate 4s job-controller Created pod: indexed-job-qjwsz
  50. Normal SuccessfulCreate 1s job-controller Created pod: indexed-job-fdhq5
  51. Normal SuccessfulCreate 1s job-controller Created pod: indexed-job-ncslj

在此示例中,你使用每个索引的自定义值运行 Job。 你可以检查其中一个 Pod 的输出:

  1. kubectl logs indexed-job-fdhq5 # 更改它以匹配来自该 Job 的 Pod 的名称

输出类似于:

  1. xuq