Custom Controller 之 Informer (一)

概述

本节标题写的是 Informer,不过我们的内容不局限于狭义的 Informer 部分,只是 Informer 最有代表性,其他的 Reflector 等也不好独立开来讲。

Informer 在很多组件的源码中可以看到,尤其是 kube-controller-manager (写这篇文章时我已经基本写完 kube-scheduler 的源码分析,着手写 kube-controller-manager 了,鉴于 controlelr 和 client-go 关联比较大,跳过来先讲讲典型的控制器工作流程中涉及到的 client-go 部分).

Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际我们用到的都不是单纯的 informer,而是组合了各种工具的 sharedInformerFactory) 我们可以轻松 List/Get 某个资源对象,可以监听资源对象的各种事件(比如创建和删除)然后触发回调函数,让我们能够在各种事件发生的时候能够作出相应的逻辑处理。举个例字,当 pod 数量变化的时候 deployment 是不是需要判断自己名下的 pod 数量是否还和预期的一样?如果少了是不是要考虑创建?

架构概览

自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。(图片来自https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)

1555996411720

我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:

1556075198766

先关注一下第一幅图中涉及到的一些 components:

client-go 相关模块

  • Reflector: Reflector 类型定义在 cache 包中(tools/cache/reflector.go:47),它的作用是向 apiserver watch 特定的资源类型。这个功能通过其绑定的 ListAndWatch 方法实现。Watch 的资源可以是 in-build 的资源也可以是 custom 的资源。当 Reflector 通过 watch API 接收到存在新的资源对象实例的通知后,它使用相应的 list API 获取新创建的资源对象,然后 put 进 Delta Fifo 队列。这个步骤在 watchHandler 函数(tools/cache/reflector.go:268)中完成。
  • Informer: 一个定义在 cache 包中的基础 controller(tools/cache/controller.go:75) (一个 informer) 从 Delta Fifo 队列中 pop 出来资源对象实例(这个功能在 processLoop 中实现(tools/cache/controller.go:148))。这个 base controller 做的工作是保存这个对象用于后续检索处理用的,然后触发我们自己的控制器来处理这个对象。
  • Indexer: Indexer 提供的是 objects 之上的检索能力。Indexer 也定义在 cache 包中(tools/cache/index.go:27). 一个典型的检索使用方式是基于一个对象的 labels 创建索引。Indexer 可以基于各种索引函数维护索引。Indexer 使用一个线程安全的 store 来存储对象和其对应的 key. 还有一个默认函数 MetaNamespaceKeyFunc(tools/cache/store.go:76) 可以生成对象的 key,类似 <namespace>/<name> 格式来关联对应的对象。

自定义控制器相关模块

  • Informer reference: 这是一个知道如何处理自定义资源对象的 Informer 实例的引用。自定义控制器需要创建合适的 Informer.
  • Indexer reference: 这是一个知道如何处理自定义资源对象的 Indexer 实例的引用. 自定义控制器代码需要创建这个引用对象,然后用于检索资源对象用于后续的处理。

Base controller 提供了 NewIndexerInformer(tools/cache/controller.go:345) 函数来创建 Informer 和 Indexer. 在代码里我们可以直接调用这个函数或者使用工厂方法来创建 informer.

  • Resource Event Handlers: 这是一个回调函数,在 Informer 想要分发一个对象给控制器的时候会调用这个函数。典型的用法是写一个函数来获取分发过来的对象的 key,将 key 放入队列中等待进一步的处理。
  • Work queue: 这个队列是在自己的控制器代码中创建的,用来解耦一个对象的分发和处理过程。Resource event handler 函数会被写成提取分发来的对象的 key,然后将这个 key 添加到 work queue 里面。
  • Process Item 这是我们在自己代码中实现的用来处理 work queue 中拿到的 items 的函数。这里可以有一个或多个函数来处理具体的过程,这个函数的典型用法是使用 Indexer 索引或者一个 Listing wrapper 来根据相应的 key 检索对象。

下面我们根据图中这几个步骤来跟源码。

第一步:reflector - List & Watch API Server

Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.

Reflector 对象定义

Reflector 的中文含义是反射器,我们先看一下类型定义:

!FILENAME tools/cache/reflector.go:47

  1. type Reflector struct {
  2. name string
  3. metrics *reflectorMetrics
  4. expectedType reflect.Type
  5. store Store
  6. listerWatcher ListerWatcher
  7. period time.Duration
  8. resyncPeriod time.Duration
  9. ShouldResync func() bool
  10. clock clock.Clock
  11. lastSyncResourceVersion string
  12. lastSyncResourceVersionMutex sync.RWMutex
  13. }

reflector.go中主要就 Reflector 这个 struct 和相关的一些函数:

1556075898739

ListAndWatch

ListAndWatch 首先 list 所有 items,获取当前的资源版本信息,然后使用这个版本信息来 watch(也就是从这个版本开始的所有资源变化会被关注)。我们看一下这里的 ListAndWatch 方法主要逻辑:

!FILENAME tools/cache/reflector.go:168

  1. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  2. // list 资源
  3. list, err := r.listerWatcher.List(options)
  4. // 提取 items
  5. items, err := meta.ExtractList(list)
  6. // 更新存储(Delta FIFO)中的 items
  7. if err := r.syncWith(items, resourceVersion); err != nil {
  8. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  9. }
  10. r.setLastSyncResourceVersion(resourceVersion)
  11. // ……
  12. for {
  13. select {
  14. case <-stopCh:
  15. return nil
  16. default:
  17. }
  18. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  19. options = metav1.ListOptions{
  20. ResourceVersion: resourceVersion,
  21. TimeoutSeconds: &timeoutSeconds,
  22. }
  23. r.metrics.numberOfWatches.Inc()
  24. // 开始 watch
  25. w, err := r.listerWatcher.Watch(options)
  26. // ……
  27. // w 交给 watchHandler 处理,这里的逻辑后面分析
  28. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
  29. if err != errorStopRequested {
  30. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
  31. }
  32. return nil
  33. }
  34. }
  35. }

第二步:watchHandler - add obj to delta fifo

前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中,我们具体来看。

!FILENAME tools/cache/reflector.go:287

  1. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  2. // ……
  3. loop:
  4. // 这里进入一个无限循环
  5. for {
  6. select {
  7. case <-stopCh:
  8. return errorStopRequested
  9. case err := <-errc:
  10. return err
  11. // watch 返回值中的一个 channel
  12. case event, ok := <-w.ResultChan():
  13. // ……
  14. newResourceVersion := meta.GetResourceVersion()
  15. // 根据事件类型处理,有 Added Modified Deleted 3种
  16. // 3 种事件分别对应 store 中的增改删操作
  17. switch event.Type {
  18. case watch.Added:
  19. err := r.store.Add(event.Object)
  20. case watch.Modified:
  21. err := r.store.Update(event.Object)
  22. case watch.Deleted:
  23. err := r.store.Delete(event.Object)
  24. default:
  25. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  26. }
  27. *resourceVersion = newResourceVersion
  28. r.setLastSyncResourceVersion(newResourceVersion)
  29. eventCount++
  30. }
  31. }
  32. // ……
  33. return nil
  34. }

第三、四、五步:Informer - pop obj from delta fifo、Add obj to store

先看 Controller 是什么

Controller

Informer 会实现 Controller 接口,这个接口长这样:

!FILENAME tools/cache/controller.go:82

  1. type Controller interface {
  2. Run(stopCh <-chan struct{})
  3. HasSynced() bool
  4. LastSyncResourceVersion() string
  5. }

和这个 Controller 对应的有一个基础 controller 实现:

!FILENAME tools/cache/controller.go:75

  1. type controller struct {
  2. config Config
  3. reflector *Reflector
  4. reflectorMutex sync.RWMutex
  5. clock clock.Clock
  6. }

controller 类型结构如下:

1556088003902

可以看到主要对外暴露的逻辑是 Run() 方法,还有一个重点 processLoop() 其实也在 Run() 里面被调用,我们看一下 Run() 中的逻辑:

!FILENAME tools/cache/controller.go:100

  1. func (c *controller) Run(stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. go func() {
  4. <-stopCh
  5. c.config.Queue.Close()
  6. }()
  7. // 内部 Reflector 创建
  8. r := NewReflector(
  9. c.config.ListerWatcher,
  10. c.config.ObjectType,
  11. c.config.Queue,
  12. c.config.FullResyncPeriod,
  13. )
  14. r.ShouldResync = c.config.ShouldResync
  15. r.clock = c.clock
  16. c.reflectorMutex.Lock()
  17. c.reflector = r
  18. c.reflectorMutex.Unlock()
  19. var wg wait.Group
  20. defer wg.Wait()
  21. wg.StartWithChannel(stopCh, r.Run)
  22. // 循环调用 processLoop
  23. wait.Until(c.processLoop, time.Second, stopCh)
  24. }

processLoop

!FILENAME tools/cache/controller.go:148

  1. func (c *controller) processLoop() {
  2. for {
  3. // 主要逻辑
  4. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  5. // 异常处理
  6. }
  7. }

这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。

PopProcessFunc 是一个类型,如下:

type PopProcessFunc func(interface{}) error

所以这里只是一个类型转换,我们关注c.config.Process就行:

!FILENAME tools/cache/controller.go:367

  1. Process: func(obj interface{}) error {
  2. for _, d := range obj.(Deltas) {
  3. switch d.Type {
  4. // 更新、添加、同步、删除等操作
  5. case Sync, Added, Updated:
  6. if old, exists, err := clientState.Get(d.Object); err == nil && exists {
  7. if err := clientState.Update(d.Object); err != nil {
  8. return err
  9. }
  10. h.OnUpdate(old, d.Object)
  11. } else {
  12. if err := clientState.Add(d.Object); err != nil {
  13. return err
  14. }
  15. h.OnAdd(d.Object)
  16. }
  17. case Deleted:
  18. if err := clientState.Delete(d.Object); err != nil {
  19. return err
  20. }
  21. h.OnDelete(d.Object)
  22. }
  23. }
  24. return nil
  25. },

这里涉及到2个点:

  • clientState
  • ResourceEventHandler (h)

我们后面会一一分析到。

clientState

前面说到 clientState,这个变量的初始化是clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

NewIndexer 代码如下:

!FILENAME tools/cache/store.go:239

  1. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  2. return &cache{
  3. cacheStorage: NewThreadSafeStore(indexers, Indices{}),
  4. keyFunc: keyFunc,
  5. }
  6. }

!FILENAME tools/cache/index.go:27

  1. type Indexer interface {
  2. Store
  3. Index(indexName string, obj interface{}) ([]interface{}, error)
  4. IndexKeys(indexName, indexKey string) ([]string, error)
  5. ListIndexFuncValues(indexName string) []string
  6. ByIndex(indexName, indexKey string) ([]interface{}, error)
  7. GetIndexers() Indexers
  8. AddIndexers(newIndexers Indexers) error
  9. }

顺带看一下 NewThreadSafeStore()

!FILENAME tools/cache/thread_safe_store.go:298

  1. func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
  2. return &threadSafeMap{
  3. items: map[string]interface{}{},
  4. indexers: indexers,
  5. indices: indices,
  6. }
  7. }

然后关注一下 Process 中的err := clientState.Add(d.Object)的 Add() 方法:

!FILENAME tools/cache/store.go:123

  1. func (c *cache) Add(obj interface{}) error {
  2. // 计算key;一般是namespace/name
  3. key, err := c.keyFunc(obj)
  4. if err != nil {
  5. return KeyError{obj, err}
  6. }
  7. // Add
  8. c.cacheStorage.Add(key, obj)
  9. return nil
  10. }

cacheStorage 是一个 ThreadSafeStore 实例,这个 Add() 代码如下:

!FILENAME tools/cache/thread_safe_store.go:68

  1. func (c *threadSafeMap) Add(key string, obj interface{}) {
  2. c.lock.Lock()
  3. defer c.lock.Unlock()
  4. // 拿出 old obj
  5. oldObject := c.items[key]
  6. // 写入 new obj
  7. c.items[key] = obj
  8. // 更新索引,有一堆逻辑
  9. c.updateIndices(oldObject, obj, key)
  10. }

这块逻辑先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。

第六步:Dispatch Event Handler functions

我们先看一个接口 SharedInformer

sharedIndexInformer

!FILENAME tools/cache/shared_informer.go:43

  1. type SharedInformer interface {
  2. AddEventHandler(handler ResourceEventHandler)
  3. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
  4. GetStore() Store
  5. GetController() Controller
  6. Run(stopCh <-chan struct{})
  7. HasSynced() bool
  8. LastSyncResourceVersion() string
  9. }

SharedInformer 有一个共享的 data cache,能够分发 changes 通知到缓存,到通过 AddEventHandler 注册了的 listerners. 当你接收到一个通知,缓存的内容能够保证至少和通知中的一样新。

再看一下 SharedIndexInformer 接口:

!FILENAME tools/cache/shared_informer.go:66

  1. type SharedIndexInformer interface {
  2. SharedInformer
  3. // AddIndexers add indexers to the informer before it starts.
  4. AddIndexers(indexers Indexers) error
  5. GetIndexer() Indexer
  6. }

相比 SharedInformer 增加了一个 Indexer. 然后看具体的实现 sharedIndexInformer 吧:

!FILENAME tools/cache/shared_informer.go:127

  1. type sharedIndexInformer struct {
  2. indexer Indexer
  3. controller Controller
  4. processor *sharedProcessor
  5. cacheMutationDetector CacheMutationDetector
  6. listerWatcher ListerWatcher
  7. objectType runtime.Object
  8. resyncCheckPeriod time.Duration
  9. defaultEventHandlerResyncPeriod time.Duration
  10. clock clock.Clock
  11. started, stopped bool
  12. startedLock sync.Mutex
  13. blockDeltas sync.Mutex
  14. }

这个类型内包了很多我们前面看到过的对象,indexer、controller、listeratcher 都不陌生,我们看这里的 processor 是做什么的:

sharedProcessor

类型定义如下:

!FILENAME tools/cache/shared_informer.go:375

  1. type sharedProcessor struct {
  2. listenersStarted bool
  3. listenersLock sync.RWMutex
  4. listeners []*processorListener
  5. syncingListeners []*processorListener
  6. clock clock.Clock
  7. wg wait.Group
  8. }

这里的重点明显是 listeners 属性了,我们继续看 listeners 的类型中的 processorListener:

processorListener

!FILENAME tools/cache/shared_informer.go:466

  1. type processorListener struct {
  2. nextCh chan interface{}
  3. addCh chan interface{}
  4. handler ResourceEventHandler
  5. // 一个 ring buffer,保存未分发的通知
  6. pendingNotifications buffer.RingGrowing
  7. // ……
  8. }

processorListener 主要有2个方法:

  • run()
  • pop()

processorListener.run()

先看一下这个 run 做了什么:

!FILENAME tools/cache/shared_informer.go:540

  1. func (p *processorListener) run() {
  2. stopCh := make(chan struct{})
  3. wait.Until(func() { // 一分钟执行一次这个 func()
  4. // 一分钟内的又有几次重试
  5. err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
  6. // 等待信号 nextCh
  7. for next := range p.nextCh {
  8. // notification 是 next 的实际类型
  9. switch notification := next.(type) {
  10. // update
  11. case updateNotification:
  12. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  13. // add
  14. case addNotification:
  15. p.handler.OnAdd(notification.newObj)
  16. // delete
  17. case deleteNotification:
  18. p.handler.OnDelete(notification.oldObj)
  19. default:
  20. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
  21. }
  22. }
  23. return true, nil
  24. })
  25. if err == nil {
  26. close(stopCh)
  27. }
  28. }, 1*time.Minute, stopCh)
  29. }

这个 run 过程不复杂,等待信号然后调用 handler 的增删改方法做对应的处理逻辑。case 里的 Notification 再看一眼:

!FILENAME tools/cache/shared_informer.go:176

  1. type updateNotification struct {
  2. oldObj interface{}
  3. newObj interface{}
  4. }
  5. type addNotification struct {
  6. newObj interface{}
  7. }
  8. type deleteNotification struct {
  9. oldObj interface{}
  10. }

另外注意到for next := range p.nextCh是下面的 case 执行的前提,也就是说触发点是 p.nextCh,我们接着看 pop 过程(这里的逻辑不简单,可能得多花点精力)

processorListener.pop()

!FILENAME tools/cache/shared_informer.go:510

  1. func (p *processorListener) pop() {
  2. defer utilruntime.HandleCrash()
  3. defer close(p.nextCh) // Tell .run() to stop
  4. // 这个 chan 是没有初始化的
  5. var nextCh chan<- interface{}
  6. // 可以接收任意类型,其实是对应前面提到的 addNotification 等
  7. var notification interface{}
  8. // for 循环套 select 是比较常规的写法
  9. for {
  10. select {
  11. //第一遍执行到这里的时候由于 nexth 没有初始化,所以这里会阻塞(和notification有没有值没有关系,notification哪怕是nil也可以写入 chan interface{} 类型的 channel)
  12. case nextCh <- notification:
  13. var ok bool
  14. // 第二次循环,下面一个case运行过之后才有这里的逻辑
  15. notification, ok = p.pendingNotifications.ReadOne()
  16. if !ok {
  17. // 将 channel 指向 nil 相当于初始化的逆操作,会使得这个 case 条件阻塞
  18. nextCh = nil
  19. }
  20. // 这里是 for 首次执行逻辑的入口
  21. case notificationToAdd, ok := <-p.addCh:
  22. if !ok {
  23. return
  24. }
  25. // 如果是 nil,也就是第一个通知过来的时候,这时不需要用到缓存(和下面else相对)
  26. if notification == nil {
  27. // 赋值给 notification,这样上面一个 case 在接下来的一轮循化中就可以读到了
  28. notification = notificationToAdd
  29. // 相当于复制引用,nextCh 就指向了 p.nextCh,使得上面 case 写 channel 的时候本质上操作了 p.nextCh,从而 run 能够读到 p.nextCh 中的信号
  30. nextCh = p.nextCh
  31. } else {
  32. // 处理到这里的时候,其实第一个 case 已经有了首个 notification,这里的逻辑是一下子来了太多 notification 就往 pendingNotifications 缓存,在第一个 case 中 有对应的 ReadOne()操作
  33. p.pendingNotifications.WriteOne(notificationToAdd)
  34. }
  35. }
  36. }
  37. }

这里的 pop 逻辑的入口是<-p.addCh,我们继续向上找一下这个 addCh 的来源:

processorListener.add()

!FILENAME tools/cache/shared_informer.go:506

  1. func (p *processorListener) add(notification interface{}) {
  2. p.addCh <- notification
  3. }

这个 add() 方法又在哪里被调用呢?

sharedProcessor.distribute()

!FILENAME tools/cache/shared_informer.go:400

  1. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  2. p.listenersLock.RLock()
  3. defer p.listenersLock.RUnlock()
  4. if sync {
  5. for _, listener := range p.syncingListeners {
  6. listener.add(obj)
  7. }
  8. } else {
  9. for _, listener := range p.listeners {
  10. listener.add(obj)
  11. }
  12. }
  13. }

这个方法逻辑比较简洁,分发对象。我们继续看哪里进入的 distribute:

sharedIndexInformer.HandleDeltas()

!FILENAME tools/cache/shared_informer.go:344

  1. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  2. s.blockDeltas.Lock()
  3. defer s.blockDeltas.Unlock()
  4. // from oldest to newest
  5. for _, d := range obj.(Deltas) {
  6. switch d.Type { // 根据 DeltaType 选择 case
  7. case Sync, Added, Updated:
  8. isSync := d.Type == Sync
  9. s.cacheMutationDetector.AddObject(d.Object)
  10. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  11. // indexer 更新的是本地 store
  12. if err := s.indexer.Update(d.Object); err != nil {
  13. return err
  14. }
  15. // 前面分析的 distribute;update
  16. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  17. } else {
  18. if err := s.indexer.Add(d.Object); err != nil {
  19. return err
  20. }
  21. // 前面分析的 distribute;add
  22. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
  23. }
  24. case Deleted:
  25. if err := s.indexer.Delete(d.Object); err != nil {
  26. return err
  27. }
  28. // 前面分析的 distribute;delete
  29. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  30. }
  31. }
  32. return nil
  33. }

继续往前看代码逻辑。

sharedIndexInformer.Run()

!FILENAME tools/cache/shared_informer.go:189

  1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. // new DeltaFIFO
  4. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
  5. cfg := &Config{
  6. // DeltaFIFO
  7. Queue: fifo,
  8. ListerWatcher: s.listerWatcher,
  9. ObjectType: s.objectType,
  10. FullResyncPeriod: s.resyncCheckPeriod,
  11. RetryOnError: false,
  12. ShouldResync: s.processor.shouldResync,
  13. // 前面分析的 HandleDeltas()
  14. Process: s.HandleDeltas,
  15. }
  16. func() {
  17. s.startedLock.Lock()
  18. defer s.startedLock.Unlock()
  19. // 创建 Informer
  20. s.controller = New(cfg)
  21. s.controller.(*controller).clock = s.clock
  22. s.started = true
  23. }()
  24. processorStopCh := make(chan struct{})
  25. var wg wait.Group
  26. defer wg.Wait() // Wait for Processor to stop
  27. defer close(processorStopCh) // Tell Processor to stop
  28. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  29. // 关注一下 s.processor.run
  30. wg.StartWithChannel(processorStopCh, s.processor.run)
  31. defer func() {
  32. s.startedLock.Lock()
  33. defer s.startedLock.Unlock()
  34. s.stopped = true
  35. }()
  36. // Run informer
  37. s.controller.Run(stopCh)
  38. }

看到这里已经挺和谐了,在 sharedIndexInformer 的 Run() 方法中先是创建一个 DeltaFIFO,然后和 lw 一起初始化 cfg,利用 cfg 创建 controller,最后 Run 这个 controller,也就是最基础的 informer.

在这段代码里我们还注意到有一步是s.processor.run,我们看一下这个 run 的逻辑。

sharedProcessor.run()

!FILENAME tools/cache/shared_informer.go:415

  1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  2. func() {
  3. p.listenersLock.RLock()
  4. defer p.listenersLock.RUnlock()
  5. for _, listener := range p.listeners {
  6. // 前面详细讲过 listener.run
  7. p.wg.Start(listener.run)
  8. // 前面详细讲过 listener.pop
  9. p.wg.Start(listener.pop)
  10. }
  11. p.listenersStarted = true
  12. }()
  13. <-stopCh
  14. // ……
  15. }

撇开细节,可以看到这里调用了内部所有 listener 的 run() 和 pop() 方法,和前面的分析呼应上了。

到这里,我们基本讲完了自定义 controller 的时候 client-go 里相关的逻辑,也就是图中的上半部分:

1556161315850