在前面的文章中已经分析过 deployment、statefulset 两个重要对象了,本文会继续分析 kubernetes 中另一个重要的对象 daemonset,在 kubernetes 中 daemonset 类似于 linux 上的守护进程会运行在每一个 node 上,在实际场景中,一般会将日志采集或者网络插件采用 daemonset 的方式部署。

DaemonSet 的基本操作

创建

daemonset 在创建后会在每个 node 上都启动一个 pod。

  1. $ kubectl create -f nginx-ds.yaml

扩缩容

由于 daemonset 是在每个 node 上启动一个 pod,其不存在扩缩容操作,副本数量跟 node 数量保持一致。

更新

daemonset 有两种更新策略 OnDeleteRollingUpdate,默认为 RollingUpdate。滚动更新时,需要指定 .spec.updateStrategy.rollingUpdate.maxUnavailable(默认为1)和 .spec.minReadySeconds(默认为 0)。

  1. // 更新镜像
  2. $ kubectl set image ds/nginx-ds nginx-ds=nginx:1.16
  3. // 查看更新状态
  4. $ kubectl rollout status ds/nginx-ds

回滚

在 statefulset 源码分析一节已经提到过 controllerRevision 这个对象了,其主要用来保存历史版本信息,在更新以及回滚操作时使用,daemonset controller 也是使用 controllerrevision 保存历史版本信息,在回滚时会使用历史 controllerrevision 中的信息替换 daemonset 中 Spec.Template

  1. // 查看 ds 历史版本信息
  2. $ kubectl get controllerrevision
  3. NAME CONTROLLER REVISION AGE
  4. nginx-ds-5c4b75bdbb daemonset.apps/nginx-ds 2 122m
  5. nginx-ds-7cd7798dcd daemonset.apps/nginx-ds 1 133m
  6. // 回滚到版本 1
  7. $ kubectl rollout undo daemonset nginx-ds --to-revision=1
  8. // 查看回滚状态
  9. $ kubectl rollout status ds/nginx-ds

暂停

daemonset 目前不支持暂停操作。

删除

daemonset 也支持两种删除操作。

  1. // 非级联删除
  2. $ kubectl delete ds/nginx-ds --cascade=false
  3. // 级联删除
  4. $ kubectl delete ds/nginx-ds

DaemonSetController 源码分析

kubernetes 版本:v1.16

首先还是看 startDaemonSetController 方法,在此方法中会初始化 DaemonSetsController 对象并调用 Run方法启动 daemonset controller,从该方法中可以看出 daemonset controller 会监听 daemonsetscontrollerRevisionpodnode 四种对象资源的变动。其中 ConcurrentDaemonSetSyncs的默认值为 2。

k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:36

  1. func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
  2. if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
  3. return nil, false, nil
  4. }
  5. dsc, err := daemon.NewDaemonSetsController(
  6. ctx.InformerFactory.Apps().V1().DaemonSets(),
  7. ctx.InformerFactory.Apps().V1().ControllerRevisions(),
  8. ctx.InformerFactory.Core().V1().Pods(),
  9. ctx.InformerFactory.Core().V1().Nodes(),
  10. ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
  11. flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
  12. )
  13. if err != nil {
  14. return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
  15. }
  16. go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
  17. return nil, true, nil
  18. }

Run 方法中会启动两个操作,一个就是 dsc.runWorker 执行的 sync 操作,另一个就是 dsc.failedPodsBackoff.GC 执行的 gc 操作,主要逻辑为:

  • 1、等待 informer 缓存同步完成;
  • 2、启动两个 goroutine 分别执行 dsc.runWorker
  • 3、启动一个 goroutine 每分钟执行一次 dsc.failedPodsBackoff.GC,从 startDaemonSetController 方法中可以看到 failedPodsBackoff 的 duration为1s,max duration为15m,failedPodsBackoff 的主要作用是当发现 daemon pod 状态为 failed 时,会定时重启该 pod;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:263

  1. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. defer dsc.queue.ShutDown()
  4. defer klog.Infof("Shutting down daemon sets controller")
  5. if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
  6. return
  7. }
  8. for i := 0; i < workers; i++ {
  9. // sync 操作
  10. go wait.Until(dsc.runWorker, time.Second, stopCh)
  11. }
  12. // GC 操作
  13. go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
  14. <-stopCh
  15. }

syncDaemonSet

daemonset 中 pod 的创建与删除是与 node 相关联的,所以每次执行 sync 操作时需要遍历所有的 node 进行判断。syncDaemonSet 的主要逻辑为:

  • 1、通过 key 获取 ns 和 name;
  • 2、从 dsLister 中获取 ds 对象;
  • 3、从 nodeLister 获取所有 node;
  • 4、获取 dsKey;
  • 5、判断 ds 是否处于删除状态;
  • 6、调用 constructHistory 获取 current 和 old controllerRevision
  • 7、调用 dsc.expectations.SatisfiedExpectations 判断是否满足 expectations 机制,expectations 机制的目的就是减少不必要的 sync 操作,关于 expectations 机制的详细说明可以参考笔者以前写的 “replicaset controller 源码分析”一文;
  • 8、调用 dsc.manage 执行实际的 sync 操作;
  • 9、判断是否为更新操作,并执行对应的更新操作逻辑;
  • 10、调用 dsc.cleanupHistory 根据 spec.revisionHistoryLimit字段清理过期的 controllerrevision
  • 11、调用 dsc.updateDaemonSetStatus 更新 ds 状态;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1212

  1. func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
  2. ......
  3. // 1、通过 key 获取 ns 和 name
  4. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  5. if err != nil {
  6. return err
  7. }
  8. // 2、从 dsLister 中获取 ds 对象
  9. ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
  10. if errors.IsNotFound(err) {
  11. dsc.expectations.DeleteExpectations(key)
  12. return nil
  13. }
  14. ......
  15. // 3、从 nodeLister 获取所有 node
  16. nodeList, err := dsc.nodeLister.List(labels.Everything())
  17. ......
  18. everything := metav1.LabelSelector{}
  19. if reflect.DeepEqual(ds.Spec.Selector, &everything) {
  20. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required. ")
  21. return nil
  22. }
  23. // 4、获取 dsKey
  24. dsKey, err := controller.KeyFunc(ds)
  25. if err != nil {
  26. return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  27. }
  28. // 5、判断 ds 是否处于删除状态
  29. if ds.DeletionTimestamp != nil {
  30. return nil
  31. }
  32. // 6、获取 current 和 old controllerRevision
  33. cur, old, err := dsc.constructHistory(ds)
  34. if err != nil {
  35. return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
  36. }
  37. hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  38. // 7、判断是否满足 expectations 机制
  39. if !dsc.expectations.SatisfiedExpectations(dsKey) {
  40. return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
  41. }
  42. // 8、执行实际的 sync 操作
  43. err = dsc.manage(ds, nodeList, hash)
  44. if err != nil {
  45. return err
  46. }
  47. // 9、判断是否为更新操作,并执行对应的更新操作
  48. if dsc.expectations.SatisfiedExpectations(dsKey) {
  49. switch ds.Spec.UpdateStrategy.Type {
  50. case apps.OnDeleteDaemonSetStrategyType:
  51. case apps.RollingUpdateDaemonSetStrategyType:
  52. err = dsc.rollingUpdate(ds, nodeList, hash)
  53. }
  54. if err != nil {
  55. return err
  56. }
  57. }
  58. // 10、清理过期的 controllerrevision
  59. err = dsc.cleanupHistory(ds, old)
  60. if err != nil {
  61. return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
  62. }
  63. // 11、更新 ds 状态
  64. return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
  65. }

syncDaemonSet 中主要有 managerollingUpdateupdateDaemonSetStatus 三个方法,分别对应创建、更新与状态同步,下面主要来分析这三个方法。

manage

manage 主要是用来保证 ds 的 pod 数正常运行在每一个 node 上,其主要逻辑为:

  • 1、调用 dsc.getNodesToDaemonPods 获取已存在 daemon pod 与 node 的映射关系;
  • 2、遍历所有 node,调用 dsc.podsShouldBeOnNode 方法来确定在给定的节点上需要创建还是删除 daemon pod;
  • 3、判断是否启动了 ScheduleDaemonSetPodsfeature-gates 特性,若启动了则需要删除通过默认调度器已经调度到不存在 node 上的 daemon pod;
  • 4、调用 dsc.syncNodes 为对应的 node 创建 daemon pod 以及删除多余的 pods;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:952

  1. func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
  2. // 1、获取已存在 daemon pod 与 node 的映射关系
  3. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  4. ......
  5. // 2、判断每一个 node 是否需要运行 daemon pod
  6. var nodesNeedingDaemonPods, podsToDelete []string
  7. for _, node := range nodeList {
  8. nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
  9. node, nodeToDaemonPods, ds)
  10. if err != nil {
  11. continue
  12. }
  13. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
  14. podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
  15. }
  16. // 3、判断是否启动了 ScheduleDaemonSetPods feature-gates 特性,若启用了则对不存在 node 上的
  17. // daemon pod 进行删除
  18. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  19. podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
  20. }
  21. // 4、为对应的 node 创建 daemon pod 以及删除多余的 pods
  22. if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
  23. return err
  24. }
  25. return nil
  26. }

manage 方法中又调用了 getNodesToDaemonPodspodsShouldBeOnNodesyncNodes 三个方法,继续来看这几种方法的作用。

getNodesToDaemonPods

getNodesToDaemonPods 是用来获取已存在 daemon pod 与 node 的映射关系,并且会通过 adopt/orphan 方法关联以及释放对应的 pod。

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:820

  1. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
  2. claimedPods, err := dsc.getDaemonPods(ds)
  3. if err != nil {
  4. return nil, err
  5. }
  6. nodeToDaemonPods := make(map[string][]*v1.Pod)
  7. for _, pod := range claimedPods {
  8. nodeName, err := util.GetTargetNodeName(pod)
  9. if err != nil {
  10. klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
  11. pod.Namespace, pod.Name, ds.Namespace, ds.Name)
  12. continue
  13. }
  14. nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
  15. }
  16. return nodeToDaemonPods, nil
  17. }
podsShouldBeOnNode

podsShouldBeOnNode 方法用来确定在给定的节点上需要创建还是删除 daemon pod,主要逻辑为:

  • 1、调用 dsc.nodeShouldRunDaemonPod 判断该 node 是否需要运行 daemon pod 以及 pod 能不能调度成功,该方法返回三个值 wantToRun, shouldSchedule, shouldContinueRunning
  • 2、通过判断 wantToRun, shouldSchedule, shouldContinueRunning 将需要创建 daemon pod 的 node 列表以及需要删除的 pod 列表获取到, wantToRun主要检查的是 selector、taints 等是否匹配,shouldSchedule 主要检查 node 上的资源是否充足,shouldContinueRunning 默认为 true;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:866

  1. func (dsc *DaemonSetsController) podsShouldBeOnNode(...) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
  2. // 1、判断该 node 是否需要运行 daemon pod 以及能不能调度成功
  3. wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
  4. if err != nil {
  5. return
  6. }
  7. // 2、获取该节点上的指定ds的pod列表
  8. daemonPods, exists := nodeToDaemonPods[node.Name]
  9. dsKey, err := cache.MetaNamespaceKeyFunc(ds)
  10. if err != nil {
  11. utilruntime.HandleError(err)
  12. return
  13. }
  14. // 3、从 suspended list 中移除在该节点上 ds 的 pod
  15. dsc.removeSuspendedDaemonPods(node.Name, dsKey)
  16. switch {
  17. // 4、对于需要创建 pod 但是不能调度 pod 的 node,先把 pod 放入到 suspended 队列中
  18. case wantToRun && !shouldSchedule:
  19. dsc.addSuspendedDaemonPods(node.Name, dsKey)
  20. // 5、需要创建 pod 且 pod 未运行,则创建 pod
  21. case shouldSchedule && !exists:
  22. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
  23. // 6、需要 pod 一直运行
  24. case shouldContinueRunning:
  25. var daemonPodsRunning []*v1.Pod
  26. for _, pod := range daemonPods {
  27. if pod.DeletionTimestamp != nil {
  28. continue
  29. }
  30. // 7、如果 pod 运行状态为 failed,则删除该 pod
  31. if pod.Status.Phase == v1.PodFailed {
  32. backoffKey := failedPodsBackoffKey(ds, node.Name)
  33. now := dsc.failedPodsBackoff.Clock.Now()
  34. inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
  35. if inBackoff {
  36. delay := dsc.failedPodsBackoff.Get(backoffKey)
  37. dsc.enqueueDaemonSetAfter(ds, delay)
  38. continue
  39. }
  40. dsc.failedPodsBackoff.Next(backoffKey, now)
  41. podsToDelete = append(podsToDelete, pod.Name)
  42. } else {
  43. daemonPodsRunning = append(daemonPodsRunning, pod)
  44. }
  45. }
  46. // 8、如果节点上已经运行 daemon pod 数 > 1,保留运行时间最长的 pod,其余的删除
  47. if len(daemonPodsRunning) > 1 {
  48. sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
  49. for i := 1; i < len(daemonPodsRunning); i++ {
  50. podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
  51. }
  52. }
  53. // 9、如果 pod 不需要继续运行但 pod 已存在则需要删除 pod
  54. case !shouldContinueRunning && exists:
  55. for _, pod := range daemonPods {
  56. if pod.DeletionTimestamp != nil {
  57. continue
  58. }
  59. podsToDelete = append(podsToDelete, pod.Name)
  60. }
  61. }
  62. return nodesNeedingDaemonPods, podsToDelete, nil
  63. }

然后继续看 nodeShouldRunDaemonPod 方法的主要逻辑:

  • 1、调用 NewPod 为该 node 构建一个 daemon pod object;
  • 2、判断 ds 是否指定了 .Spec.Template.Spec.NodeName 字段;
  • 3、调用 dsc.simulate 执行 GeneralPredicates 预选算法检查该 node 是否能够调度成功;
  • 4、判断 GeneralPredicates 预选算法执行后的 reasons 确定 wantToRun, shouldSchedule, shouldContinueRunning 的值;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1337

  1. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
  2. // 1、构建 daemon pod object
  3. newPod := NewPod(ds, node.Name)
  4. wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
  5. // 2、判断 ds 是否指定了 node,若指定了且不为当前 node 直接返回 false
  6. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
  7. return false, false, false, nil
  8. }
  9. // 3、执行 GeneralPredicates 预选算法
  10. reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
  11. if err != nil {
  12. ......
  13. }
  14. // 4、检查预选算法执行的结果
  15. var insufficientResourceErr error
  16. for _, r := range reasons {
  17. switch reason := r.(type) {
  18. case *predicates.InsufficientResourceError:
  19. insufficientResourceErr = reason
  20. case *predicates.PredicateFailureError:
  21. var emitEvent bool
  22. switch reason {
  23. case
  24. predicates.ErrNodeSelectorNotMatch,
  25. predicates.ErrPodNotMatchHostName,
  26. predicates.ErrNodeLabelPresenceViolated,
  27. predicates.ErrPodNotFitsHostPorts:
  28. return false, false, false, nil
  29. case predicates.ErrTaintsTolerationsNotMatch:
  30. fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
  31. if err != nil {
  32. return false, false, false, err
  33. }
  34. if !fitsNoExecute {
  35. return false, false, false, nil
  36. }
  37. wantToRun, shouldSchedule = false, false
  38. case
  39. predicates.ErrDiskConflict,
  40. predicates.ErrVolumeZoneConflict,
  41. predicates.ErrMaxVolumeCountExceeded,
  42. predicates.ErrNodeUnderMemoryPressure,
  43. predicates.ErrNodeUnderDiskPressure:
  44. shouldSchedule = false
  45. emitEvent = true
  46. case
  47. predicates.ErrPodAffinityNotMatch,
  48. predicates.ErrServiceAffinityViolated:
  49. return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
  50. default:
  51. wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
  52. emitEvent = true
  53. }
  54. ......
  55. }
  56. }
  57. if shouldSchedule && insufficientResourceErr != nil {
  58. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
  59. shouldSchedule = false
  60. }
  61. return
  62. }
syncNodes

syncNodes 方法主要是为需要 daemon pod 的 node 创建 pod 以及删除多余的 pod,其主要逻辑为:

  • 1、将 createDiffdeleteDiffburstReplicas 进行比较,burstReplicas 默认值为 250 即每个 syncLoop 中创建或者删除的 pod 数最多为 250 个,若超过其值则剩余需要创建或者删除的 pod 在下一个 syncLoop 继续操作;
  • 2、将 createDiffdeleteDiff 写入到 expectations 中;
  • 3、并发创建 pod,创建 pod 有两种方法:(1)创建的 pod 不经过默认调度器,直接指定了 pod 的运行节点(即设定pod.Spec.NodeName);(2)若启用了 ScheduleDaemonSetPods feature-gates 特性,则使用默认调度器进行创建 pod,通过 nodeAffinity来保证每个节点都运行一个 pod;
  • 4、并发删除 deleteDiff 中的所有 pod;

ScheduleDaemonSetPods 是一个 feature-gates 特性,其出现在 v1.11 中,在 v1.12 中处于 Beta 版本,v1.17 为 GA 版。最初 daemonset controller 只有一种创建 pod 的方法,即直接指定 pod 的 spec.NodeName 字段,但是目前这种方式已经暴露了许多问题,在以后的发展中社区还是希望能通过默认调度器进行调度,所以才出现了第二种方式,原因主要有以下五点:

  • 1、DaemonSet 无法感知 node 上资源的变化 (#46935, #58868):当 pod 第一次因资源不够无法创建时,若其他 pod 退出后资源足够时 DaemonSet 无法感知到;
  • 2、Daemonset 无法支持 Pod Affinity 和 Pod AntiAffinity 的功能(#29276);
  • 3、在某些功能上需要实现和 scheduler 重复的代码逻辑, 例如:critical pods (#42028), tolerant/taint;
  • 4、当 DaemonSet 的 Pod 创建失败时难以 debug,例如:资源不足时,对于 pending pod 最好能打一个 event 说明;
  • 5、多个组件同时调度时难以实现抢占机制:这也是无法通过横向扩展调度器提高调度吞吐量的一个原因;

更详细的原因可以参考社区的文档:schedule-DS-pod-by-scheduler.md

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:990

  1. func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
  2. ......
  3. // 1、设置 burstReplicas
  4. createDiff := len(nodesNeedingDaemonPods)
  5. deleteDiff := len(podsToDelete)
  6. if createDiff > dsc.burstReplicas {
  7. createDiff = dsc.burstReplicas
  8. }
  9. if deleteDiff > dsc.burstReplicas {
  10. deleteDiff = dsc.burstReplicas
  11. }
  12. // 2、写入到 expectations 中
  13. dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
  14. errCh := make(chan error, createDiff+deleteDiff)
  15. createWait := sync.WaitGroup{}
  16. generation, err := util.GetTemplateGeneration(ds)
  17. if err != nil {
  18. generation = nil
  19. }
  20. template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
  21. // 3、并发创建 pod,创建的 pod 数依次为 1, 2, 4, 8, ...
  22. batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
  23. for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
  24. errorCount := len(errCh)
  25. createWait.Add(batchSize)
  26. for i := pos; i < pos+batchSize; i++ {
  27. go func(ix int) {
  28. defer createWait.Done()
  29. var err error
  30. podTemplate := template.DeepCopy()
  31. // 4、若启动了 ScheduleDaemonSetPods 功能,则通过 kube-scheduler 创建 pod
  32. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
  33. podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
  34. podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
  35. err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
  36. ds, metav1.NewControllerRef(ds, controllerKind))
  37. } else {
  38. // 5、否则直接设置 pod 的 .spec.NodeName 创建 pod
  39. err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
  40. ds, metav1.NewControllerRef(ds, controllerKind))
  41. }
  42. // 6、创建 pod 时忽略 timeout err
  43. if err != nil && errors.IsTimeout(err) {
  44. return
  45. }
  46. if err != nil {
  47. dsc.expectations.CreationObserved(dsKey)
  48. errCh <- err
  49. utilruntime.HandleError(err)
  50. }
  51. }(i)
  52. }
  53. createWait.Wait()
  54. // 7、将创建失败的 pod 数记录到 expectations 中
  55. skippedPods := createDiff - (batchSize + pos)
  56. if errorCount < len(errCh) && skippedPods > 0 {
  57. dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
  58. break
  59. }
  60. }
  61. // 8、并发删除 deleteDiff 中的 pod
  62. deleteWait := sync.WaitGroup{}
  63. deleteWait.Add(deleteDiff)
  64. for i := 0; i < deleteDiff; i++ {
  65. go func(ix int) {
  66. defer deleteWait.Done()
  67. if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
  68. dsc.expectations.DeletionObserved(dsKey)
  69. errCh <- err
  70. utilruntime.HandleError(err)
  71. }
  72. }(i)
  73. }
  74. deleteWait.Wait()
  75. errors := []error{}
  76. close(errCh)
  77. for err := range errCh {
  78. errors = append(errors, err)
  79. }
  80. return utilerrors.NewAggregate(errors)
  81. }

RollingUpdate

daemonset update 的方式有两种 OnDeleteRollingUpdate,当为 OnDelete 时需要用户手动删除每一个 pod 后完成更新操作,当为 RollingUpdate 时,daemonset controller 会自动控制升级进度。

当为 RollingUpdate 时,主要逻辑为:

  • 1、获取 daemonset pod 与 node 的映射关系;
  • 2、根据 controllerrevision 的 hash 值获取所有未更新的 pods;
  • 3、获取 maxUnavailable, numUnavailable 的 pod 数值,maxUnavailable 是从 ds 的 rollingUpdate 字段中获取的默认值为 1,numUnavailable 的值是通过 daemonset pod 与 node 的映射关系计算每个 node 下是否有 available pod 得到的;
  • 4、通过 oldPods 获取 oldAvailablePods, oldUnavailablePods 的 pod 列表;
  • 5、遍历 oldUnavailablePods 列表将需要删除的 pod 追加到 oldPodsToDelete 数组中。oldUnavailablePods 列表中的 pod 分为两种,一种处于更新中,即删除状态,一种处于未更新且异常状态,处于异常状态的都需要被删除;
  • 6、遍历 oldAvailablePods 列表,此列表中的 pod 都处于正常运行状态,根据 maxUnavailable 值确定是否需要删除该 pod 并将需要删除的 pod 追加到 oldPodsToDelete 数组中;
  • 7、调用 dsc.syncNodes 删除 oldPodsToDelete 数组中的 pods,syncNodes 方法在 manage 阶段已经分析过,此处不再详述;

rollingUpdate 的结果是找出需要删除的 pods 并进行删除,被删除的 pod 在下一个 syncLoop 中会通过 manage 方法使用最新版本的 daemonset template 进行创建,整个滚动更新的过程是通过先删除再创建的方式一步步完成更新的,每次操作都是严格按照 maxUnavailable 的值确定需要删除的 pod 数。

k8s.io/kubernetes/pkg/controller/daemon/update.go:43

  1. func (dsc *DaemonSetsController) rollingUpdate(......) error {
  2. // 1、获取 daemonset pod 与 node 的映射关系
  3. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  4. ......
  5. // 2、获取所有未更新的 pods
  6. _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
  7. // 3、计算 maxUnavailable, numUnavailable 的 pod 数值
  8. maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
  9. if err != nil {
  10. return fmt.Errorf("couldn't get unavailable numbers: %v", err)
  11. }
  12. oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
  13. // 4、将非 running 状态的 pods 加入到 oldPodsToDelete 中
  14. var oldPodsToDelete []string
  15. for _, pod := range oldUnavailablePods {
  16. if pod.DeletionTimestamp != nil {
  17. continue
  18. }
  19. oldPodsToDelete = append(oldPodsToDelete, pod.Name)
  20. }
  21. // 5、根据 maxUnavailable 值确定是否需要删除 pod
  22. for _, pod := range oldAvailablePods {
  23. if numUnavailable >= maxUnavailable {
  24. break
  25. }
  26. oldPodsToDelete = append(oldPodsToDelete, pod.Name)
  27. numUnavailable++
  28. }
  29. // 6、调用 syncNodes 方法删除 oldPodsToDelete 数组中的 pods
  30. return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
  31. }

总结一下,manage 方法中的主要流程为:

  1. |-> dsc.getNodesToDaemonPods
  2. |
  3. |
  4. manage ---- |-> dsc.podsShouldBeOnNode ---> dsc.nodeShouldRunDaemonPod
  5. |
  6. |
  7. |-> dsc.syncNodes

updateDaemonSetStatus

updateDaemonSetStatussyncDaemonSet 中最后执行的方法,主要是用来计算 ds status subresource 中的值并更新其 status。status 如下所示:

  1. status:
  2. currentNumberScheduled: 1 // 已经运行了 DaemonSet Pod的节点数量
  3. desiredNumberScheduled: 1 // 需要运行该DaemonSet Pod的节点数量
  4. numberMisscheduled: 0 // 不需要运行 DeamonSet Pod 但是已经运行了的节点数量
  5. numberReady: 0 // DaemonSet Pod状态为Ready的节点数量
  6. numberAvailable: 1 // DaemonSet Pod状态为Ready且运行时间超过 // Spec.MinReadySeconds 的节点数量
  7. numberUnavailable: 0 // desiredNumberScheduled - numberAvailable 的节点数量
  8. observedGeneration: 3
  9. updatedNumberScheduled: 1 // 已经完成DaemonSet Pod更新的节点数量

updateDaemonSetStatus 主要逻辑为:

  • 1、调用 dsc.getNodesToDaemonPods 获取已存在 daemon pod 与 node 的映射关系;
  • 2、遍历所有 node,调用 dsc.nodeShouldRunDaemonPod 判断该 node 是否需要运行 daemon pod,然后计算 status 中的部分字段值;
  • 3、调用 storeDaemonSetStatus 更新 ds status subresource;
  • 4、判断 ds 是否需要 resync;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1152

  1. func (dsc *DaemonSetsController) updateDaemonSetStatus(......) error {
  2. // 1、获取已存在 daemon pod 与 node 的映射关系
  3. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  4. ......
  5. var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
  6. for _, node := range nodeList {
  7. // 2、判断该 node 是否需要运行 daemon pod
  8. wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
  9. if err != nil {
  10. return err
  11. }
  12. scheduled := len(nodeToDaemonPods[node.Name]) > 0
  13. // 3、计算 status 中的字段值
  14. if wantToRun {
  15. desiredNumberScheduled++
  16. if scheduled {
  17. currentNumberScheduled++
  18. daemonPods, _ := nodeToDaemonPods[node.Name]
  19. sort.Sort(podByCreationTimestampAndPhase(daemonPods))
  20. pod := daemonPods[0]
  21. if podutil.IsPodReady(pod) {
  22. numberReady++
  23. if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  24. numberAvailable++
  25. }
  26. }
  27. generation, err := util.GetTemplateGeneration(ds)
  28. if err != nil {
  29. generation = nil
  30. }
  31. if util.IsPodUpdated(pod, hash, generation) {
  32. updatedNumberScheduled++
  33. }
  34. }
  35. } else {
  36. if scheduled {
  37. numberMisscheduled++
  38. }
  39. }
  40. }
  41. numberUnavailable := desiredNumberScheduled - numberAvailable
  42. // 4、更新 daemonset status subresource
  43. err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
  44. if err != nil {
  45. return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
  46. }
  47. // 5、判断 ds 是否需要 resync
  48. if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
  49. dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
  50. }
  51. return nil
  52. }

最后,再总结一下 syncDaemonSet 方法的主要流程:

  1. |-> dsc.getNodesToDaemonPods
  2. |
  3. |
  4. |-> manage -->|-> dsc.podsShouldBeOnNode ---> dsc.nodeShouldRunDaemonPod
  5. | |
  6. | |
  7. syncDaemonSet --> | |-> dsc.syncNodes
  8. |
  9. |-> rollingUpdate
  10. |
  11. |
  12. |-> updateDaemonSetStatus

总结

在 daemonset controller 中可以看到许多功能都是 deployment 和 statefulset 已有的。在创建 pod 的流程与 replicaset controller 创建 pod 的流程是相似的,都使用了 expectations 机制并且限制了在一个 syncLoop 中最多创建或删除的 pod 数。更新方式与 statefulset 一样都有 OnDeleteRollingUpdate 两种, OnDelete 方式与 statefulset 相似,都需要手动删除对应的 pod,而 RollingUpdate 方式与 statefulset 和 deployment 都有点区别, RollingUpdate方式更新时不支持暂停操作并且 pod 是先删除再创建的顺序进行。版本控制方式与 statefulset 的一样都是使用 controllerRevision。最后要说的一点是在 v1.12 及以后的版本中,使用 daemonset 创建的 pod 已不再使用直接指定 .spec.nodeName的方式绕过调度器进行调度,而是走默认调度器通过 nodeAffinity 的方式调度到每一个节点上。

参考:

https://yq.aliyun.com/articles/702305