抢占调度

Pod priority

Pod 有了 priority(优先级) 后才有优先级调度、抢占调度的说法,高优先级的 pod 可以在调度队列中排到前面,优先选择 node;另外当高优先级的 pod 找不到合适的 node 时,就会看 node 上低优先级的 pod 驱逐之后是否能够 run 起来,如果可以,那么 node 上的一个或多个低优先级的 pod 会被驱逐,然后高优先级的 pod 得以成功运行1个 node 上。

今天我们分析 pod 抢占相关的代码。开始之前我们看一下和 priority 相关的2个示例配置文件:

PriorityClass 例子

  1. apiVersion: scheduling.k8s.io/v1
  2. kind: PriorityClass
  3. metadata:
  4. name: high-priority
  5. value: 1000000
  6. globalDefault: false
  7. description: "This priority class should be used for XYZ service pods only."

使用上述 PriorityClass

  1. apiVersion: v1
  2. kind: Pod
  3. metadata:
  4. name: nginx
  5. labels:
  6. env: test
  7. spec:
  8. containers:
  9. - name: nginx
  10. image: nginx
  11. imagePullPolicy: IfNotPresent
  12. priorityClassName: high-priority

这两个文件的内容这里不解释,Pod priority 相关知识点不熟悉的小伙伴请先查阅官方文档,我们下面看调度器中和 preempt 相关的代码逻辑。

preempt 入口

pkg/scheduler/scheduler.go:513 scheduleOne()方法中我们上一次关注的是suggestedHost, err := sched.schedule(pod)这行代码,也就是关注通常情况下调度器如何给一个 pod 匹配一个最合适的 node. 今天我们来看如果这一行代码返回的 err != nil 情况下,如何开始 preempt 过程。

!FILENAME pkg/scheduler/scheduler.go:529

  1. suggestedHost, err := sched.schedule(pod)
  2. if err != nil {
  3. if fitError, ok := err.(*core.FitError); ok {
  4. preemptionStartTime := time.Now()
  5. sched.preempt(pod, fitError)
  6. metrics.PreemptionAttempts.Inc()
  7. } else {
  8. klog.Errorf("error selecting node for pod: %v", err)
  9. metrics.PodScheduleErrors.Inc()
  10. }
  11. return
  12. }

schedule()函数没有返回 host,也就是没有找到合适的 node 的时候,就会出发 preempt 过程。这时候代码逻辑进入sched.preempt(pod, fitError)这一行。我们先看一下这个函数的整体逻辑,然后深入其中涉及的子过程:

!FILENAME pkg/scheduler/scheduler.go:311

  1. func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
  2. // 特性没有开启就返回 ""
  3. if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
  4. return "", nil
  5. }
  6. // 更新 pod 信息;入参和返回值都是 *v1.Pod 类型
  7. preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
  8. // preempt 过程,下文分析
  9. node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
  10. var nodeName = ""
  11. if node != nil {
  12. nodeName = node.Name
  13. // 更新队列中“任命pod”队列
  14. sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
  15. // 设置pod的Status.NominatedNodeName
  16. err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
  17. if err != nil {
  18. // 如果出错就从 queue 中移除
  19. sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
  20. return "", err
  21. }
  22. for _, victim := range victims {
  23. // 将要驱逐的 pod 驱逐
  24. if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
  25. return "", err
  26. }
  27. sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
  28. }
  29. }
  30. // Clearing nominated pods should happen outside of "if node != nil".
  31. // 这个清理过程在上面的if外部,我们回头从 Preempt() 的实现去理解
  32. for _, p := range nominatedPodsToClear {
  33. rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
  34. if rErr != nil {
  35. klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
  36. // We do not return as this error is not critical.
  37. }
  38. }
  39. return nodeName, err
  40. }

preempt 实现

上面 preempt() 函数中涉及到了一些值得深入看看的对象,下面我们逐个看一下这些对象的实现。

SchedulingQueue

SchedulingQueue 表示的是一个存储待调度 pod 的队列

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:60

  1. type SchedulingQueue interface {
  2. Add(pod *v1.Pod) error
  3. AddIfNotPresent(pod *v1.Pod) error
  4. AddUnschedulableIfNotPresent(pod *v1.Pod) error
  5. Pop() (*v1.Pod, error)
  6. Update(oldPod, newPod *v1.Pod) error
  7. Delete(pod *v1.Pod) error
  8. MoveAllToActiveQueue()
  9. AssignedPodAdded(pod *v1.Pod)
  10. AssignedPodUpdated(pod *v1.Pod)
  11. NominatedPodsForNode(nodeName string) []*v1.Pod
  12. WaitingPods() []*v1.Pod
  13. Close()
  14. UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
  15. DeleteNominatedPodIfExists(pod *v1.Pod)
  16. NumUnschedulablePods() int
  17. }

在 Scheduler 中 SchedulingQueue 接口对应两种实现:

  • FIFO 先进先出队列
  • PriorityQueue 优先级队列

FIFO

FIFO 结构是对 cache.FIFO 的简单包装,然后实现了 SchedulingQueue 接口。

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:97

  1. type FIFO struct {
  2. *cache.FIFO
  3. }

cache.FIFO定义在vendor/k8s.io/client-go/tools/cache/fifo.go:93,这个先进先出队列的细节先不讨论。

PriorityQueue

PriorityQueue 同样实现了 SchedulingQueue 接口,PriorityQueue 的顶是最高优先级的 pending pod. 这里的PriorityQueue 有2个子 queue,activeQ 放的是等待调度的 pod,unschedulableQ 放的是已经尝试过调度,然后失败了,被标记为 unschedulable 的 pod.

我们看一下 PriorityQueue 结构的定义:

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:201

  1. type PriorityQueue struct {
  2. stop <-chan struct{}
  3. clock util.Clock
  4. lock sync.RWMutex
  5. cond sync.Cond
  6. // heap 头节点存的是最高优先级的 pod
  7. activeQ *Heap
  8. // unschedulableQ holds pods that have been tried and determined unschedulable.
  9. unschedulableQ *UnschedulablePodsMap
  10. // 存储已经被指定好要跑在某个 node 的 pod
  11. nominatedPods *nominatedPodMap
  12. // 只要将 pod 从 unschedulableQ 移动到 activeQ,就设置为true;从 activeQ 中 pop 出来 pod的时候设置为 false. 这个字段表明一个 pod 在被调度的过程中是否接收到了队列 move 操作,如果发生了 move 操作,那么这个 pod 就算被认定为 unschedulable,也被放回到 activeQ.
  13. receivedMoveRequest bool
  14. closed bool
  15. }

PriorityQueue 的方法比较好理解,我们看几个吧:

1、func (p *PriorityQueue) Add(pod *v1.Pod) error //在 active queue 中添加1个pod

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:276

  1. func (p *PriorityQueue) Add(pod *v1.Pod) error {
  2. p.lock.Lock()
  3. defer p.lock.Unlock()
  4. // 直接在 activeQ 中添加 pod
  5. err := p.activeQ.Add(pod)
  6. if err != nil {
  7. klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
  8. } else {
  9. // 如果在 unschedulableQ 中找到这个 pod,抛错误日志后移除队列中该 pod
  10. if p.unschedulableQ.get(pod) != nil {
  11. klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
  12. p.unschedulableQ.delete(pod)
  13. }
  14. // 队列的 nominatedPods 属性中标记该 pod 不指定到任何 node
  15. p.nominatedPods.add(pod, "")
  16. p.cond.Broadcast()
  17. }
  18. return err
  19. }

2、func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error//如果2个队列中都不存在该 pod,那么就添加到 active queue 中

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:295

  1. func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
  2. p.lock.Lock()
  3. defer p.lock.Unlock()
  4. //如果队列 unschedulableQ 中有 pod,啥也不做
  5. if p.unschedulableQ.get(pod) != nil {
  6. return nil
  7. }
  8. //如果队列 activeQ 中有 pod,啥也不做
  9. if _, exists, _ := p.activeQ.Get(pod); exists {
  10. return nil
  11. }
  12. // 添加 pod 到 activeQ
  13. err := p.activeQ.Add(pod)
  14. if err != nil {
  15. klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
  16. } else {
  17. p.nominatedPods.add(pod, "")
  18. p.cond.Broadcast()
  19. }
  20. return err
  21. }

3、func (p *PriorityQueue) flushUnschedulableQLeftover()//刷新 unschedulableQ 中的 pod,如果一个 pod 的呆的时间超过了 durationStayUnschedulableQ,就移动到 activeQ 中

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:346

  1. func (p *PriorityQueue) flushUnschedulableQLeftover() {
  2. p.lock.Lock()
  3. defer p.lock.Unlock()
  4. var podsToMove []*v1.Pod
  5. currentTime := p.clock.Now()
  6. // 遍历 unschedulableQ 中的 pod
  7. for _, pod := range p.unschedulableQ.pods {
  8. lastScheduleTime := podTimestamp(pod)
  9. // 这里的默认值是 60s,所以超过 60s 的 pod 将得到进入 activeQ 的机会
  10. if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
  11. podsToMove = append(podsToMove, pod)
  12. }
  13. }
  14. if len(podsToMove) > 0 {
  15. // 全部移到 activeQ 中,又有机会被调度了
  16. p.movePodsToActiveQueue(podsToMove)
  17. }
  18. }

4、func (p *PriorityQueue) Pop() (*v1.Pod, error)//从 activeQ 中 pop 一个 pod

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:367

  1. func (p *PriorityQueue) Pop() (*v1.Pod, error) {
  2. p.lock.Lock()
  3. defer p.lock.Unlock()
  4. for len(p.activeQ.data.queue) == 0 {
  5. // 当队列为空的时候会阻塞
  6. if p.closed {
  7. return nil, fmt.Errorf(queueClosed)
  8. }
  9. p.cond.Wait()
  10. }
  11. obj, err := p.activeQ.Pop()
  12. if err != nil {
  13. return nil, err
  14. }
  15. pod := obj.(*v1.Pod)
  16. // 标记 receivedMoveRequest 为 false,表示新的一次调度开始了
  17. p.receivedMoveRequest = false
  18. return pod, err
  19. }

再看个别 PriorityQueue.nominatedPods 属性相关操作的方法,也就是 preempt() 函数中多次调用到的方法:

5、`func (p PriorityQueue) UpdateNominatedPodForNode(pod v1.Pod, nodeName string)`//pod 抢占的时候,确定一个 node 可以用于跑这个 pod 时,通过调用这个方法将 pod nominated 到 指定的 node 上。

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:567

  1. func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
  2. p.lock.Lock()
  3. //逻辑在这里面
  4. p.nominatedPods.add(pod, nodeName)
  5. p.lock.Unlock()
  6. }

先看 nominatedPods 属性的类型,这个类型用于存储 pods 被 nominate 到 nodes 的信息:

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:822

  1. type nominatedPodMap struct {
  2. // key 是 node name,value 是 nominated 到这个 node 上的 pods
  3. nominatedPods map[string][]*v1.Pod
  4. // 和上面结构相反,key 是 pod 信息,值是 node 信息
  5. nominatedPodToNode map[ktypes.UID]string
  6. }

在看一下add()方法的实现:

!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:832

  1. func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
  2. // 不管有没有,先删一下,防止重了
  3. npm.delete(p)
  4. nnn := nodeName
  5. // 如果传入的 nodeName 是 “”
  6. if len(nnn) == 0 {
  7. // 查询 pod 的 pod.Status.NominatedNodeName
  8. nnn = NominatedNodeName(p)
  9. // 如果 pod.Status.NominatedNodeName 也是 “”,return
  10. if len(nnn) == 0 {
  11. return
  12. }
  13. }
  14. // 逻辑到这里说明要么 nodeName 不为空字符串,要么 nodeName 为空字符串但是 pod 的 pod.Status.NominatedNodeName 不为空字符串,这时候开始下面的赋值
  15. npm.nominatedPodToNode[p.UID] = nnn
  16. for _, np := range npm.nominatedPods[nnn] {
  17. if np.UID == p.UID {
  18. klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
  19. return
  20. }
  21. }
  22. npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
  23. }

PodPreemptor

PodPreemptor 用来驱逐 pods 和更新 pod annotations.

!FILENAME pkg/scheduler/factory/factory.go:145

  1. type PodPreemptor interface {
  2. GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
  3. DeletePod(pod *v1.Pod) error
  4. SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
  5. RemoveNominatedNodeName(pod *v1.Pod) error
  6. }

这个 interface 对应的实现类型是:

!FILENAME pkg/scheduler/factory/factory.go:1620

  1. type podPreemptor struct {
  2. Client clientset.Interface
  3. }

这个类型绑定了4个方法:

!FILENAME pkg/scheduler/factory/factory.go:1624

  1. // 新获取一次 pod 的信息
  2. func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
  3. return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
  4. }
  5. // 删除一个 pod
  6. func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
  7. return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
  8. }
  9. // 设置pod.Status.NominatedNodeName 为指定的 node name
  10. func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
  11. podCopy := pod.DeepCopy()
  12. podCopy.Status.NominatedNodeName = nominatedNodeName
  13. _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
  14. return err
  15. }
  16. // 清空 pod.Status.NominatedNodeName
  17. func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
  18. if len(pod.Status.NominatedNodeName) == 0 {
  19. return nil
  20. }
  21. return p.SetNominatedNodeName(pod, "")
  22. }

xx.Algorithm.Preempt

接口定义

我们回到挺久之前讲常规调度过程的时候提过的一个接口:

!FILENAME pkg/scheduler/algorithm/scheduler_interface.go:78

  1. type ScheduleAlgorithm interface {
  2. Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
  3. // Preempt 在 pod 调度发生失败的时候尝试抢占低优先级的 pod.
  4. // 返回发生 preemption 的 node, 被 preempt的 pods 列表,
  5. // nominated node name 需要被移除的 pods 列表,一个 error 信息.
  6. Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
  7. Predicates() map[string]FitPredicate
  8. Prioritizers() []PriorityConfig
  9. }

这个接口上次我们讲到的时候关注了Schedule()Predicates()Prioritizers(),这次来看Preempt()是怎么实现的。

整体流程

Preempt()同样由genericScheduler类型(pkg/scheduler/core/generic_scheduler.go:98)实现,方法前的一大串英文注释先来理解一下:

  • Preempt 寻找一个在发生抢占之后能够成功调度“pod”的node.
  • Preempt 选择一个 node 然后抢占上面的 pods 资源,返回:
    • 这个 node 信息
    • 被抢占的 pods 信息
    • nominated node name 需要被清理的 node 列表
    • 可能有的 error
  • Preempt 过程不涉及快照更新(快照的逻辑以后再讲)
  • 避免出现这种情况:preempt 发现一个不需要驱逐任何 pods 就能够跑“pod”的 node.
  • 当有很多 pending pods 在调度队列中的时候,a nominated pod 会排到队列中相同优先级的 pod 后面.
  • The nominated pod 会阻止其他 pods 使用“指定”的资源,哪怕花费了很多时间来等待其他 pending 的 pod.

我们先过整体流程,然后逐个分析子流程调用:

!FILENAME pkg/scheduler/core/generic_scheduler.go:251

  1. func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
  2. // 省略几行
  3. // 判断执行驱逐操作是否合适
  4. if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
  5. klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
  6. return nil, nil, nil, nil
  7. }
  8. // 所有的 nodes
  9. allNodes, err := nodeLister.List()
  10. if err != nil {
  11. return nil, nil, nil, err
  12. }
  13. if len(allNodes) == 0 {
  14. return nil, nil, nil, ErrNoNodesAvailable
  15. }
  16. // 计算潜在的执行驱逐后能够用于跑 pod 的 nodes
  17. potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
  18. if len(potentialNodes) == 0 {
  19. klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
  20. // In this case, we should clean-up any existing nominated node name of the pod.
  21. return nil, nil, []*v1.Pod{pod}, nil
  22. }
  23. // 列出 pdb 对象
  24. pdbs, err := g.pdbLister.List(labels.Everything())
  25. if err != nil {
  26. return nil, nil, nil, err
  27. }
  28. // 计算所有 node 需要驱逐的 pods 有哪些等,后面细讲
  29. nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
  30. g.predicateMetaProducer, g.schedulingQueue, pdbs)
  31. if err != nil {
  32. return nil, nil, nil, err
  33. }
  34. // 拓展调度的逻辑
  35. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
  36. if err != nil {
  37. return nil, nil, nil, err
  38. }
  39. // 选择1个 node 用于 schedule
  40. candidateNode := pickOneNodeForPreemption(nodeToVictims)
  41. if candidateNode == nil {
  42. return nil, nil, nil, err
  43. }
  44. // 低优先级的被 nominate 到这个 node 的 pod 很可能已经不再 fit 这个 node 了,所以
  45. // 需要移除这些 pod 的 nomination,更新这些 pod,挪动到 activeQ 中,让调度器
  46. // 得以寻找另外一个 node 给这些 pod
  47. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
  48. if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
  49. return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
  50. }
  51. return nil, nil, nil, fmt.Errorf(
  52. "preemption failed: the target node %s has been deleted from scheduler cache",
  53. candidateNode.Name)
  54. }

上面涉及到一些子过程调用,我们逐个来看~

  1. podEligibleToPreemptOthers() // 如何判断是否适合抢占?
  2. nodesWherePreemptionMightHelp() // 怎么寻找能够用于 preempt 的 nodes?
  3. selectNodesForPreemption() // 这个过程计算的是什么?
  4. pickOneNodeForPreemption() // 怎么从选择最合适被抢占的 node?

podEligibleToPreemptOthers

  • podEligibleToPreemptOthers 做的事情是判断一个 pod 是否应该去抢占其他 pods. 如果这个 pod 已经抢占过其他 pods,那些 pods 还在 graceful termination period 中,那就不应该再次发生抢占。
  • 如果一个 node 已经被这个 pod nominated,并且这个 node 上有处于 terminating 状态的 pods,那么就不考虑驱逐更多的 pods.

这个函数逻辑很简单,我们直接看源码:

!FILENAME pkg/scheduler/core/generic_scheduler.go:1110

  1. func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
  2. nomNodeName := pod.Status.NominatedNodeName
  3. // 如果 pod.Status.NominatedNodeName 不是空字符串
  4. if len(nomNodeName) > 0 {
  5. // 被 nominate 的 node
  6. if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
  7. for _, p := range nodeInfo.Pods() {
  8. // 有低优先级的 pod 处于删除中状态,就返回 false
  9. if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
  10. // There is a terminating pod on the nominated node.
  11. return false
  12. }
  13. }
  14. }
  15. }
  16. return true
  17. }

nodesWherePreemptionMightHelp

nodesWherePreemptionMightHelp 要做的事情是寻找 predicates 阶段失败但是通过抢占也许能够调度成功的 nodes.

这个函数也不怎么长,看下代码:

!FILENAME pkg/scheduler/core/generic_scheduler.go:1060

  1. func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
  2. // 潜力 node, 用于存储返回值的 slice
  3. potentialNodes := []*v1.Node{}
  4. for _, node := range nodes {
  5. // 这个为 true 表示一个 node 驱逐 pod 也不一定能适合当前 pod 运行
  6. unresolvableReasonExist := false
  7. // 一个 node 对应的所有失败的 predicates
  8. failedPredicates, _ := failedPredicatesMap[node.Name]
  9. // 遍历,看是不是再下面指定的这些原因中,如果在,就标记 unresolvableReasonExist = true
  10. for _, failedPredicate := range failedPredicates {
  11. switch failedPredicate {
  12. case
  13. predicates.ErrNodeSelectorNotMatch,
  14. predicates.ErrPodAffinityRulesNotMatch,
  15. predicates.ErrPodNotMatchHostName,
  16. predicates.ErrTaintsTolerationsNotMatch,
  17. predicates.ErrNodeLabelPresenceViolated,
  18. predicates.ErrNodeNotReady,
  19. predicates.ErrNodeNetworkUnavailable,
  20. predicates.ErrNodeUnderDiskPressure,
  21. predicates.ErrNodeUnderPIDPressure,
  22. predicates.ErrNodeUnderMemoryPressure,
  23. predicates.ErrNodeOutOfDisk,
  24. predicates.ErrNodeUnschedulable,
  25. predicates.ErrNodeUnknownCondition,
  26. predicates.ErrVolumeZoneConflict,
  27. predicates.ErrVolumeNodeConflict,
  28. predicates.ErrVolumeBindConflict:
  29. unresolvableReasonExist = true
  30. // 如果找到一个上述失败原因,说明这个 node 已经可以排除了,break 后继续下一个 node 的计算
  31. break
  32. }
  33. }
  34. // false 的时候,也就是这个 node 也许驱逐 pods 后有用,那就添加到 potentialNodes 中
  35. if !unresolvableReasonExist {
  36. klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
  37. potentialNodes = append(potentialNodes, node)
  38. }
  39. }
  40. return potentialNodes
  41. }

selectNodesForPreemption

这个函数会并发计算所有的 nodes 是否通过驱逐实现 pod 抢占。

看这个函数内容之前我们先看一下返回值的类型:

map[*v1.Node]*schedulerapi.Victims 的 key 很好理解,value 是啥呢:

  1. type Victims struct {
  2. Pods []*v1.Pod
  3. NumPDBViolations int
  4. }

这里的 Pods 是被选中准备要驱逐的;NumPDBViolations 表示的是要破坏多少个 PDB 限制。这里肯定也就是要尽量符合 PDB 要求,能不和 PDB 冲突就不冲突。

然后看一下这个函数的整体过程:

!FILENAME pkg/scheduler/core/generic_scheduler.go:895

  1. func selectNodesForPreemption(pod *v1.Pod,
  2. nodeNameToInfo map[string]*schedulercache.NodeInfo,
  3. potentialNodes []*v1.Node, // 上一个函数计算出来的 nodes
  4. predicates map[string]algorithm.FitPredicate,
  5. metadataProducer algorithm.PredicateMetadataProducer,
  6. queue internalqueue.SchedulingQueue, // 这里其实是前面讲的优先级队列 PriorityQueue
  7. pdbs []*policy.PodDisruptionBudget, // pdb 列表
  8. ) (map[*v1.Node]*schedulerapi.Victims, error) {
  9. nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
  10. var resultLock sync.Mutex
  11. // We can use the same metadata producer for all nodes.
  12. meta := metadataProducer(pod, nodeNameToInfo)
  13. // 这种形式的并发已经不陌生了,前面遇到过几次了
  14. checkNode := func(i int) {
  15. nodeName := potentialNodes[i].Name
  16. var metaCopy algorithm.PredicateMetadata
  17. if meta != nil {
  18. metaCopy = meta.ShallowCopy()
  19. }
  20. // 这里有一个子过程调用,下面单独介绍
  21. pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
  22. if fits {
  23. resultLock.Lock()
  24. victims := schedulerapi.Victims{
  25. Pods: pods,
  26. NumPDBViolations: numPDBViolations,
  27. }
  28. // 如果 fit,就添加到 nodeToVictims 中,也就是最后的返回值
  29. nodeToVictims[potentialNodes[i]] = &victims
  30. resultLock.Unlock()
  31. }
  32. }
  33. workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
  34. return nodeToVictims, nil
  35. }

上面这个函数的核心逻辑在 selectVictimsOnNode 中,这个函数尝试在给定的 node 中寻找最少数量的需要被驱逐的 pods,同时需要保证驱逐了这些 pods 之后,这个 noode 能够满足“pod”运行需求。

这些被驱逐的 pods 计算同时需要满足一个约束,就是能够删除低优先级的 pod 绝不先删高优先级的 pod.

这个算法首选计算当这个 node 上所有的低优先级 pods 被驱逐之后能否调度“pod”. 如果可以,那就按照优先级排序,根据 PDB 是否破坏分成两组,一组是影响 PDB 限制的,另外一组是不影响 PDB. 两组各自按照优先级排序。然后开始逐渐释放影响 PDB 的 group 中的 pod,然后逐渐释放不影响 PDB 的 group 中的 pod,在这个过程中要保持“pod”能够 fit 这个 node. 也就是说一旦放过某一个 pod 导致“pod”不 fit 这个 node 了,那就说明这个 pod 不能放过,也就是意味着已经找到了最少 pods 集。

看一下具体的实现吧:

FILENAME pkg/scheduler/core/generic_scheduler.go:983

  1. func selectVictimsOnNode(
  2. pod *v1.Pod,
  3. meta algorithm.PredicateMetadata,
  4. nodeInfo *schedulercache.NodeInfo,
  5. fitPredicates map[string]algorithm.FitPredicate,
  6. queue internalqueue.SchedulingQueue,
  7. pdbs []*policy.PodDisruptionBudget,
  8. ) ([]*v1.Pod, int, bool) {
  9. if nodeInfo == nil {
  10. return nil, 0, false
  11. }
  12. // 排个序
  13. potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
  14. nodeInfoCopy := nodeInfo.Clone()
  15. // 定义删除 pod 函数
  16. removePod := func(rp *v1.Pod) {
  17. nodeInfoCopy.RemovePod(rp)
  18. if meta != nil {
  19. meta.RemovePod(rp)
  20. }
  21. }
  22. // 定义添加 pod 函数
  23. addPod := func(ap *v1.Pod) {
  24. nodeInfoCopy.AddPod(ap)
  25. if meta != nil {
  26. meta.AddPod(ap, nodeInfoCopy)
  27. }
  28. }
  29. // 删除所有的低优先级 pod 看是不是能够满足调度需求了
  30. podPriority := util.GetPodPriority(pod)
  31. for _, p := range nodeInfoCopy.Pods() {
  32. if util.GetPodPriority(p) < podPriority {
  33. // 删除的意思其实就是添加元素到 potentialVictims.Items
  34. potentialVictims.Items = append(potentialVictims.Items, p)
  35. removePod(p)
  36. }
  37. }
  38. // 排个序
  39. potentialVictims.Sort()
  40. // 如果删除了所有的低优先级 pods 之后还不能跑这个新 pod,那么差不多就可以判断这个 node 不适合 preemption 了,还有一点点需要考虑的是这个“pod”的不 fit 的原因是由于 pod affinity 不满足了。
  41. // 后续可能会增加当前 pod 和低优先级 pod 之间的 优先级检查。
  42. // 这个函数调用其实就是之前讲到过的预选函数的调用逻辑,判断这个 pod 是否合适跑在这个 node 上。
  43. if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
  44. if err != nil {
  45. klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
  46. }
  47. return nil, 0, false
  48. }
  49. var victims []*v1.Pod
  50. numViolatingVictim := 0
  51. // 尝试尽量多地释放这些 pods,也就是说能少杀就少杀;这里先从 PDB violating victims 中释放,再从 PDB non-violating victims 中释放;两个组都是从高优先级的 pod 开始释放。
  52. violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
  53. // 释放 pods 的函数,来一个放一个
  54. reprievePod := func(p *v1.Pod) bool {
  55. addPod(p)
  56. fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
  57. if !fits {
  58. removePod(p)
  59. victims = append(victims, p)
  60. klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
  61. }
  62. return fits
  63. }
  64. // 释放 violatingVictims 中元素的同时会记录放了多少个
  65. for _, p := range violatingVictims {
  66. if !reprievePod(p) {
  67. numViolatingVictim++
  68. }
  69. }
  70. // 开始释放 non-violating victims.
  71. for _, p := range nonViolatingVictims {
  72. reprievePod(p)
  73. }
  74. return victims, numViolatingVictim, true
  75. }

pickOneNodeForPreemption

pickOneNodeForPreemption 要从给定的 nodes 中选择一个 node,这个函数假设给定的 map 中 value 部分是以 priority 降序排列的。这里选择 node 的标准是:

  1. 最少的 PDB violations
  2. 最少的高优先级 victim
  3. 优先级总数字最小
  4. victim 总数最小
  5. 直接返回第一个

!FILENAME pkg/scheduler/core/generic_scheduler.go:788

  1. func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
  2. if len(nodesToVictims) == 0 {
  3. return nil
  4. }
  5. // 初始化为最大值
  6. minNumPDBViolatingPods := math.MaxInt32
  7. var minNodes1 []*v1.Node
  8. lenNodes1 := 0
  9. // 这个循环要找到 PDBViolatingPods 最少的 node,如果有多个,就全部存在 minNodes1 中
  10. for node, victims := range nodesToVictims {
  11. if len(victims.Pods) == 0 {
  12. // 如果发现一个不需要驱逐 pod 的 node,马上返回
  13. return node
  14. }
  15. numPDBViolatingPods := victims.NumPDBViolations
  16. if numPDBViolatingPods < minNumPDBViolatingPods {
  17. minNumPDBViolatingPods = numPDBViolatingPods
  18. minNodes1 = nil
  19. lenNodes1 = 0
  20. }
  21. if numPDBViolatingPods == minNumPDBViolatingPods {
  22. minNodes1 = append(minNodes1, node)
  23. lenNodes1++
  24. }
  25. }
  26. // 如果只找到1个 PDB violations 最少的 node,那就直接返回这个 node 就 ok 了
  27. if lenNodes1 == 1 {
  28. return minNodes1[0]
  29. }
  30. // 还剩下多个 node,那就寻找 highest priority victim 最小的 node
  31. minHighestPriority := int32(math.MaxInt32)
  32. var minNodes2 = make([]*v1.Node, lenNodes1)
  33. lenNodes2 := 0
  34. // 这个循环要做的事情是看2个 node 上 victims 中最高优先级的 pod 哪个优先级更高
  35. for i := 0; i < lenNodes1; i++ {
  36. node := minNodes1[i]
  37. victims := nodesToVictims[node]
  38. // highestPodPriority is the highest priority among the victims on this node.
  39. highestPodPriority := util.GetPodPriority(victims.Pods[0])
  40. if highestPodPriority < minHighestPriority {
  41. minHighestPriority = highestPodPriority
  42. lenNodes2 = 0
  43. }
  44. if highestPodPriority == minHighestPriority {
  45. minNodes2[lenNodes2] = node
  46. lenNodes2++
  47. }
  48. }
  49. // 发现只有1个,那就直接返回
  50. if lenNodes2 == 1 {
  51. return minNodes2[0]
  52. }
  53. // 这时候还没有抉择出一个 node,那就开始计算优先级总和了,看哪个更低
  54. minSumPriorities := int64(math.MaxInt64)
  55. lenNodes1 = 0
  56. for i := 0; i < lenNodes2; i++ {
  57. var sumPriorities int64
  58. node := minNodes2[i]
  59. for _, pod := range nodesToVictims[node].Pods {
  60. // 这里的累加考虑到了先把优先级搞成正数。不然会出现1个 node 上有1优先级为 -3 的 pod,另外一个 node 上有2个优先级为 -3 的 pod,结果 -3>-6,有2个 pod 的 node 反而被认为总优先级更低!
  61. sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
  62. }
  63. if sumPriorities < minSumPriorities {
  64. minSumPriorities = sumPriorities
  65. lenNodes1 = 0
  66. }
  67. if sumPriorities == minSumPriorities {
  68. minNodes1[lenNodes1] = node
  69. lenNodes1++
  70. }
  71. }
  72. if lenNodes1 == 1 {
  73. return minNodes1[0]
  74. }
  75. // 还是没有分出胜负,于是开始用 pod 总数做比较
  76. minNumPods := math.MaxInt32
  77. lenNodes2 = 0
  78. for i := 0; i < lenNodes1; i++ {
  79. node := minNodes1[i]
  80. numPods := len(nodesToVictims[node].Pods)
  81. if numPods < minNumPods {
  82. minNumPods = numPods
  83. lenNodes2 = 0
  84. }
  85. if numPods == minNumPods {
  86. minNodes2[lenNodes2] = node
  87. lenNodes2++
  88. }
  89. }
  90. // 还是没有区分出来1个 node 的话,只能放弃区分了,直接返回第一个结果
  91. if lenNodes2 > 0 {
  92. return minNodes2[0]
  93. }
  94. klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
  95. return nil
  96. }

小结

咋个说呢,此处应该有总结的,抢占过程的逻辑比我想象中的复杂,设计很巧妙,行云流水,大快人心!preemption 可以简单说成再预选->再优选吧;还是不多说了,一天写这么多有点坐不住了,下回再继续聊调度器~