当集群中的 node 或 pod 异常时,大部分用户会使用 kubectl 查看对应的 events,那么 events 是从何而来的?其实 k8s 中的各个组件会将运行时产生的各种事件汇报到 apiserver,对于 k8s 中的可描述资源,使用 kubectl describe 都可以看到其相关的 events,那 k8s 中又有哪几个组件都上报 events 呢?

只要在 k8s.io/kubernetes/cmd 目录下暴力搜索一下就能知道哪些组件会产生 events:

  1. $ grep -R -n -i "EventRecorder" .

可以看出,controller-manage、kube-proxy、kube-scheduler、kubelet 都使用了 EventRecorder,本文只讲述 kubelet 中对 Events 的使用。

1、Events 的定义

events 在 k8s.io/api/core/v1/types.go 中进行定义,结构体如下所示:

  1. type Event struct {
  2. metav1.TypeMeta `json:",inline"`
  3. metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
  4. InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
  5. Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
  6. Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
  7. Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
  8. FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
  9. LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
  10. Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
  11. Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
  12. EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
  13. Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
  14. Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
  15. Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
  16. ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
  17. ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
  18. ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
  19. }

其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。

k8s 中 events 目前只有两种类型:”Normal” 和 “Warning”:

events 的两种类型

2、EventBroadcaster 的初始化

events 的整个生命周期都与 EventBroadcaster 有关,kubelet 中对 EventBroadcaster 的初始化在k8s.io/kubernetes/cmd/kubelet/app/server.go中:

  1. func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
  2. ...
  3. // event 初始化
  4. makeEventRecorder(kubeDeps, nodeName)
  5. ...
  6. }
  7. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  8. if kubeDeps.Recorder != nil {
  9. return
  10. }
  11. // 初始化 EventBroadcaster
  12. eventBroadcaster := record.NewBroadcaster()
  13. // 初始化 EventRecorder
  14. kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  15. // 记录 events 到本地日志
  16. eventBroadcaster.StartLogging(glog.V(3).Infof)
  17. if kubeDeps.EventClient != nil {
  18. glog.V(4).Infof("Sending events to api server.")
  19. // 上报 events 到 apiserver
  20. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
  21. } else {
  22. glog.Warning("No api server defined - no events will be sent to API server.")
  23. }
  24. }

Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等),EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在 k8s.io/client-go/tools/record/event.go 中:

  1. type eventBroadcasterImpl struct {
  2. *watch.Broadcaster
  3. sleepDuration time.Duration
  4. }
  5. // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
  6. type EventBroadcaster interface {
  7. StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
  8. StartRecordingToSink(sink EventSink) watch.Interface
  9. StartLogging(logf func(format string, args ...interface{})) watch.Interface
  10. NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
  11. }

EventBroadcaster 是个接口类型,该接口有以下四个方法:

  • StartEventWatcher() : EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理
  • StartRecordingToSink() : 调用 StartEventWatcher() 接收 events,并将收到的 events 发送到 apiserver
  • StartLogging() :也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志
  • NewRecorder() :会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件

eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。

  1. func NewBroadcaster() EventBroadcaster {
  2. return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
  3. }

可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。

3、Events 的生成

从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义:

  1. type recorderImpl struct {
  2. scheme *runtime.Scheme
  3. source v1.EventSource
  4. *watch.Broadcaster
  5. clock clock.Clock
  6. }
  7. type EventRecorder interface {
  8. Event(object runtime.Object, eventtype, reason, message string)
  9. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
  10. PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
  11. AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
  12. }

EventRecorder 中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。

以下是生成 events 的函数:

  1. func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
  2. ref, err := ref.GetReference(recorder.scheme, object)
  3. if err != nil {
  4. glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
  5. return
  6. }
  7. if !validateEventType(eventtype) {
  8. glog.Errorf("Unsupported event type: '%v'", eventtype)
  9. return
  10. }
  11. event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
  12. event.Source = recorder.source
  13. go func() {
  14. // NOTE: events should be a non-blocking operation
  15. defer utilruntime.HandleCrash()
  16. // 发送事件
  17. recorder.Action(watch.Added, event)
  18. }()
  19. }
  20. func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
  21. t := metav1.Time{Time: recorder.clock.Now()}
  22. namespace := ref.Namespace
  23. if namespace == "" {
  24. namespace = metav1.NamespaceDefault
  25. }
  26. return &v1.Event{
  27. ObjectMeta: metav1.ObjectMeta{
  28. Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
  29. Namespace: namespace,
  30. Annotations: annotations,
  31. },
  32. InvolvedObject: *ref,
  33. Reason: reason,
  34. Message: message,
  35. FirstTimestamp: t,
  36. LastTimestamp: t,
  37. Count: 1,
  38. Type: eventtype,
  39. }
  40. }

初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。

以下是 Action() 方法的实现:

  1. func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
  2. m.incoming <- Event{action, obj}
  3. }
4、Events 的广播

上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 k8s.io/apimachinery/pkg/watch/mux.go 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。

以下是 Broadcaster 广播 events 的实现:

  1. func (m *Broadcaster) loop() {
  2. for event := range m.incoming {
  3. if event.Type == internalRunFunctionMarker {
  4. event.Object.(functionFakeRuntimeObject)()
  5. continue
  6. }
  7. m.distribute(event)
  8. }
  9. m.closeAll()
  10. m.distributing.Done()
  11. }
  12. // distribute sends event to all watchers. Blocking.
  13. func (m *Broadcaster) distribute(event Event) {
  14. m.lock.Lock()
  15. defer m.lock.Unlock()
  16. if m.fullChannelBehavior == DropIfChannelFull {
  17. for _, w := range m.watchers {
  18. select {
  19. case w.result <- event:
  20. case <-w.stopped:
  21. default: // Don't block if the event can't be queued.
  22. }
  23. }
  24. } else {
  25. for _, w := range m.watchers {
  26. select {
  27. case w.result <- event:
  28. case <-w.stopped:
  29. }
  30. }
  31. }
  32. }
5、Events 的处理

那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数:

  1. func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
  2. watcher := eventBroadcaster.Watch()
  3. go func() {
  4. defer utilruntime.HandleCrash()
  5. for watchEvent := range watcher.ResultChan() {
  6. event, ok := watchEvent.Object.(*v1.Event)
  7. if !ok {
  8. // This is all local, so there's no reason this should
  9. // ever happen.
  10. continue
  11. }
  12. eventHandler(event)
  13. }
  14. }()
  15. return watcher
  16. }

StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。

  1. func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
  2. return eventBroadcaster.StartEventWatcher(
  3. func(e *v1.Event) {
  4. logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
  5. })
  6. }

StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。

  1. func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
  2. // The default math/rand package functions aren't thread safe, so create a
  3. // new Rand object for each StartRecording call.
  4. randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
  5. eventCorrelator := NewEventCorrelator(clock.RealClock{})
  6. return eventBroadcaster.StartEventWatcher(
  7. func(event *v1.Event) {
  8. recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
  9. })
  10. }
  11. func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
  12. eventCopy := *event
  13. event = &eventCopy
  14. result, err := eventCorrelator.EventCorrelate(event)
  15. if err != nil {
  16. utilruntime.HandleError(err)
  17. }
  18. if result.Skip {
  19. return
  20. }
  21. tries := 0
  22. for {
  23. if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
  24. break
  25. }
  26. tries++
  27. if tries >= maxTriesPerEvent {
  28. glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
  29. break
  30. }
  31. // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
  32. if tries == 1 {
  33. time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
  34. } else {
  35. time.Sleep(sleepDuration)
  36. }
  37. }
  38. }

StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。

6、Events 简单实现

了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能:

  • 1、事件的产生
  • 2、事件的发送
  • 3、事件广播
  • 4、事件缓存
  • 5、事件过滤和聚合
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. // watcher queue
  8. const queueLength = int64(1)
  9. // Events xxx
  10. type Events struct {
  11. Reason string
  12. Message string
  13. Source string
  14. Type string
  15. Count int64
  16. Timestamp time.Time
  17. }
  18. // EventBroadcaster xxx
  19. type EventBroadcaster interface {
  20. Event(etype, reason, message string)
  21. StartLogging() Interface
  22. Stop()
  23. }
  24. // eventBroadcaster xxx
  25. type eventBroadcasterImpl struct {
  26. *Broadcaster
  27. }
  28. func NewEventBroadcaster() EventBroadcaster {
  29. return &eventBroadcasterImpl{NewBroadcaster(queueLength)}
  30. }
  31. func (eventBroadcaster *eventBroadcasterImpl) Stop() {
  32. eventBroadcaster.Shutdown()
  33. }
  34. // generate event
  35. func (eventBroadcaster *eventBroadcasterImpl) Event(etype, reason, message string) {
  36. events := &Events{Type: etype, Reason: reason, Message: message}
  37. // send event to broadcast
  38. eventBroadcaster.Action(events)
  39. }
  40. // 仅实现 StartLogging() 的功能,将日志打印
  41. func (eventBroadcaster *eventBroadcasterImpl) StartLogging() Interface {
  42. // register a watcher
  43. watcher := eventBroadcaster.Watch()
  44. go func() {
  45. for watchEvent := range watcher.ResultChan() {
  46. fmt.Printf("%v\n", watchEvent)
  47. }
  48. }()
  49. go func() {
  50. time.Sleep(time.Second * 4)
  51. watcher.Stop()
  52. }()
  53. return watcher
  54. }
  55. // --------------------
  56. // Broadcaster 定义与实现
  57. // 接收 events channel 的长度
  58. const incomingQueuLength = 100
  59. type Broadcaster struct {
  60. lock sync.Mutex
  61. incoming chan Events
  62. watchers map[int64]*broadcasterWatcher
  63. watchersQueue int64
  64. watchQueueLength int64
  65. distributing sync.WaitGroup
  66. }
  67. func NewBroadcaster(queueLength int64) *Broadcaster {
  68. m := &Broadcaster{
  69. incoming: make(chan Events, incomingQueuLength),
  70. watchers: map[int64]*broadcasterWatcher{},
  71. watchQueueLength: queueLength,
  72. }
  73. m.distributing.Add(1)
  74. // 后台启动一个 goroutine 广播 events
  75. go m.loop()
  76. return m
  77. }
  78. // Broadcaster 接收所产生的 events
  79. func (m *Broadcaster) Action(event *Events) {
  80. m.incoming <- *event
  81. }
  82. // 广播 events 到每个 watcher
  83. func (m *Broadcaster) loop() {
  84. // 从 incoming channel 中读取所接收到的 events
  85. for event := range m.incoming {
  86. // 发送 events 到每一个 watcher
  87. for _, w := range m.watchers {
  88. select {
  89. case w.result <- event:
  90. case <-w.stopped:
  91. default:
  92. }
  93. }
  94. }
  95. m.closeAll()
  96. m.distributing.Done()
  97. }
  98. func (m *Broadcaster) Shutdown() {
  99. close(m.incoming)
  100. m.distributing.Wait()
  101. }
  102. func (m *Broadcaster) closeAll() {
  103. // TODO
  104. m.lock.Lock()
  105. defer m.lock.Unlock()
  106. for _, w := range m.watchers {
  107. close(w.result)
  108. }
  109. m.watchers = map[int64]*broadcasterWatcher{}
  110. }
  111. func (m *Broadcaster) stopWatching(id int64) {
  112. m.lock.Lock()
  113. defer m.lock.Unlock()
  114. w, ok := m.watchers[id]
  115. if !ok {
  116. return
  117. }
  118. delete(m.watchers, id)
  119. close(w.result)
  120. }
  121. // 调用 Watch()方法注册一个 watcher
  122. func (m *Broadcaster) Watch() Interface {
  123. watcher := &broadcasterWatcher{
  124. result: make(chan Events, incomingQueuLength),
  125. stopped: make(chan struct{}),
  126. id: m.watchQueueLength,
  127. m: m,
  128. }
  129. m.watchers[m.watchersQueue] = watcher
  130. m.watchQueueLength++
  131. return watcher
  132. }
  133. // watcher 实现
  134. type Interface interface {
  135. Stop()
  136. ResultChan() <-chan Events
  137. }
  138. type broadcasterWatcher struct {
  139. result chan Events
  140. stopped chan struct{}
  141. stop sync.Once
  142. id int64
  143. m *Broadcaster
  144. }
  145. // 每个 watcher 通过该方法读取 channel 中广播的 events
  146. func (b *broadcasterWatcher) ResultChan() <-chan Events {
  147. return b.result
  148. }
  149. func (b *broadcasterWatcher) Stop() {
  150. b.stop.Do(func() {
  151. close(b.stopped)
  152. b.m.stopWatching(b.id)
  153. })
  154. }
  155. // --------------------
  156. func main() {
  157. eventBroadcast := NewEventBroadcaster()
  158. var wg sync.WaitGroup
  159. wg.Add(1)
  160. // producer event
  161. go func() {
  162. defer wg.Done()
  163. time.Sleep(time.Second)
  164. eventBroadcast.Event("add", "test", "1")
  165. time.Sleep(time.Second * 2)
  166. eventBroadcast.Event("add", "test", "2")
  167. time.Sleep(time.Second * 3)
  168. eventBroadcast.Event("add", "test", "3")
  169. //eventBroadcast.Stop()
  170. }()
  171. eventBroadcast.StartLogging()
  172. wg.Wait()
  173. }

此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 k8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。

代码请参考:https://github.com/gosoon/k8s-learning-notes/tree/master/k8s-package/events

7、总结

本文讲述了 k8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster:

  • 1、kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。
  • 2、kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。
  • 3、Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。
  • 4、EventBroadcaster 对 events 有三个处理方法:StartEventWatcher()、StartRecordingToSink()、StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。
  • 5、 Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。
  • 6、kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。