Proxy 服务框架

程序入口与初始化

kube-proxy使用通用Cobra框架构建一个标准的CLI应用.同kubernets基础组件(如:scheduler)一样的方式来创建应用和运行应用,因此应用command构建层不将对其深入分析,我们将重心聚焦在后面的proxyserver创建与proxyserver run代码分析。

!FILENAME cmd/kube-proxy/proxy.go:35

  1. func main() {
  2. rand.Seed(time.Now().UnixNano())
  3. command := app.NewProxyCommand() //Cobra命令风格应用对象创建
  4. //......
  5. //对前面实例化Command对象的执行
  6. if err := command.Execute(); err != nil {
  7. fmt.Fprintf(os.Stderr, "error: %v\n", err)
  8. os.Exit(1)
  9. }
  10. }

NewProxyCommand()返回命令command对象,command.Execute()则调用定义的Run: func{},执行内部的opts.Run().

!FILENAME cmd/kube-proxy/app/server.go:370

  1. func NewProxyCommand() *cobra.Command {
  2. // 创建options对象(全局的应用配置对象)
  3. opts := NewOptions()
  4. cmd := &cobra.Command{
  5. Use: "kube-proxy",
  6. Long: `The Kubernetes network proxy runs on each node...略`,
  7. Run: func(cmd *cobra.Command, args []string) { //Command应用RUN命令func定义
  8. verflag.PrintAndExitIfRequested()
  9. utilflag.PrintFlags(cmd.Flags())
  10. if err := initForOS(opts.WindowsService); err != nil {
  11. klog.Fatalf("failed OS init: %v", err)
  12. }
  13. // 完成所有需求的options配置(外置配置文件、自定义主机名处理、特征功能开关设置等)
  14. if err := opts.Complete(); err != nil {
  15. klog.Fatalf("failed complete: %v", err)
  16. }
  17. // 校验kube-proxy配置的有效性
  18. if err := opts.Validate(args); err != nil {
  19. klog.Fatalf("failed validate: %v", err)
  20. }
  21. klog.Fatal(opts.Run()) //应用真正执行Run(),后面进行分析
  22. },
  23. }
  24. var err error
  25. opts.config, err = opts.ApplyDefaults(opts.config) //应用默认配置,完成配置的初始化工作
  26. if err != nil {
  27. klog.Fatalf("unable to create flag defaults: %v", err)
  28. }
  29. opts.AddFlags(cmd.Flags())
  30. cmd.MarkFlagFilename("config", "yaml", "yml", "json")
  31. return cmd //返回命令command对象
  32. }

NewOptions() 全局配置对象创建

!FILENAME cmd/kube-proxy/app/server.go:184

  1. func NewOptions() *Options {
  2. return &Options{
  3. config: new(kubeproxyconfig.KubeProxyConfiguration), //实例化配置config对象
  4. healthzPort: ports.ProxyHealthzPort, //Healthz端口
  5. metricsPort: ports.ProxyStatusPort, //metrics端口
  6. scheme: scheme.Scheme,
  7. codecs: scheme.Codecs,
  8. CleanupIPVS: true,
  9. }
  10. }
  11. //完整的配置结构体与命令执行时用户传递的命令选项相对应
  12. type KubeProxyConfiguration struct {
  13. metav1.TypeMeta
  14. FeatureGates map[string]bool //功能特征模块开关
  15. BindAddress string //默认所有接口0.0.0.0
  16. HealthzBindAddress string //默认0.0.0.0:10256
  17. MetricsBindAddress string //默认127.0.0.1:10249
  18. EnableProfiling bool //"/debug/pprof"
  19. ClusterCIDR string //ClusterCIDR
  20. HostnameOverride string //自定义Hostname
  21. ClientConnection apimachineryconfig.ClientConnectionConfiguration //apiserver client
  22. IPTables KubeProxyIPTablesConfiguration //IPTABLES配置项(地址伪装、同步周期等)
  23. IPVS KubeProxyIPVSConfiguration //IPVS配置项(同步周期、调度器等)
  24. OOMScoreAdj *int32 //修改OOMScoreAdj分值
  25. Mode ProxyMode //proxy模式
  26. PortRange string //端口range
  27. ResourceContainer string //"Default: /kube-proxy"
  28. UDPIdleTimeout metav1.Duration //UDP空闲超时
  29. Conntrack KubeProxyConntrackConfiguration //Conntrack对象
  30. ConfigSyncPeriod metav1.Duration //同步周期
  31. NodePortAddresses []string //Node地址
  32. }
  33. const (
  34. ProxyHealthzPort = 10256
  35. ProxyStatusPort = 10249
  36. )

opts.Run() 创建proxy服务并启动

!FILENAME cmd/kube-proxy/app/server.go:250

  1. func (o *Options) Run() error {
  2. if len(o.WriteConfigTo) > 0 {
  3. return o.writeConfigFile() //写配置文件
  4. }
  5. proxyServer, err := NewProxyServer(o) //基于配置创建proxy服务
  6. if err != nil {
  7. return err
  8. }
  9. return proxyServer.Run() //运行proxy服务,后续详细分析proxyserver执行代码逻辑
  10. }

上面已完成对kube-proxy第一层(CLI创建、配置项初始化、启动)分析,下面进入proxy Server创建与启动运行代码分析( kube-proxy第二层 )。

Proxy Server创建

NewProxyServer() 非windows版本代码

!FILENAME cmd/kube-proxy/app/server_others.go:55

  1. func NewProxyServer(o *Options) (*ProxyServer, error) {
  2. return newProxyServer(o.config, o.CleanupAndExit, o.CleanupIPVS, o.scheme, o.master)
  3. }
  4. func newProxyServer(
  5. config *proxyconfigapi.KubeProxyConfiguration,
  6. cleanupAndExit bool,
  7. cleanupIPVS bool,
  8. scheme *runtime.Scheme,
  9. master string) (*ProxyServer, error) {
  10. if config == nil {
  11. return nil, errors.New("config is required")
  12. }
  13. if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
  14. c.Set(config)
  15. } else {
  16. return nil, fmt.Errorf("unable to register configz: %s", err)
  17. }
  18. //协议IPV4 or IPV6
  19. protocol := utiliptables.ProtocolIpv4
  20. if net.ParseIP(config.BindAddress).To4() == nil {
  21. klog.V(0).Infof("IPv6 bind address (%s), assume IPv6 operation", config.BindAddress)
  22. protocol = utiliptables.ProtocolIpv6
  23. }
  24. // 关键依赖工具实现的api接口iptables/ipvs/ipset/dbus
  25. // 从此处则可以看出kube-proxy底层技术的依赖(当然不同的proxy-mode,实现技术也不一样 )
  26. var iptInterface utiliptables.Interface
  27. var ipvsInterface utilipvs.Interface
  28. var kernelHandler ipvs.KernelHandler
  29. var ipsetInterface utilipset.Interface
  30. var dbus utildbus.Interface
  31. // exec命令执行器对象创建
  32. execer := exec.New()
  33. // dbus对象创建(linux实现进程间通信机制)
  34. dbus = utildbus.New()
  35. // iptables操作对象创建
  36. iptInterface = utiliptables.New(execer, dbus, protocol)
  37. // IPVS
  38. kernelHandler = ipvs.NewLinuxKernelHandler()
  39. // ipset
  40. ipsetInterface = utilipset.New(execer)
  41. // IPVS环境检测
  42. canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
  43. if canUseIPVS {
  44. ipvsInterface = utilipvs.New(execer)
  45. }
  46. // We omit creation of pretty much everything if we run in cleanup mode
  47. if cleanupAndExit {
  48. return &ProxyServer{
  49. execer: execer,
  50. IptInterface: iptInterface,
  51. IpvsInterface: ipvsInterface,
  52. IpsetInterface: ipsetInterface,
  53. CleanupAndExit: cleanupAndExit,
  54. }, nil
  55. }
  56. //api client
  57. client, eventClient, err := createClients(config.ClientConnection, master)
  58. if err != nil {
  59. return nil, err
  60. }
  61. //主机名
  62. hostname, err := utilnode.GetHostname(config.HostnameOverride)
  63. if err != nil {
  64. return nil, err
  65. }
  66. // 事件广播器
  67. eventBroadcaster := record.NewBroadcaster()
  68. // Create event recorder
  69. recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})
  70. nodeRef := &v1.ObjectReference{
  71. Kind: "Node",
  72. Name: hostname,
  73. UID: types.UID(hostname),
  74. Namespace: "",
  75. }
  76. var healthzServer *healthcheck.HealthzServer
  77. var healthzUpdater healthcheck.HealthzUpdater
  78. //创建默认的healthzServer服务对象
  79. if len(config.HealthzBindAddress) > 0 {
  80. healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
  81. healthzUpdater = healthzServer
  82. }
  83. var proxier proxy.ProxyProvider
  84. var serviceEventHandler proxyconfig.ServiceHandler
  85. var endpointsEventHandler proxyconfig.EndpointsHandler
  86. // proxyMode模式配置获取
  87. proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
  88. // 节点绑定IP
  89. nodeIP := net.ParseIP(config.BindAddress)
  90. if nodeIP.IsUnspecified() {
  91. nodeIP = utilnode.GetNodeIP(client, hostname)
  92. }
  93. if proxyMode == proxyModeIPTables { // proxyMode为"IPTables"
  94. klog.V(0).Info("Using iptables Proxier.")
  95. if config.IPTables.MasqueradeBit == nil {
  96. // MasqueradeBit must be specified or defaulted.
  97. return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
  98. }
  99. //创建iptables proxier对象
  100. proxierIPTables, err := iptables.NewProxier(
  101. iptInterface,
  102. utilsysctl.New(),
  103. execer,
  104. config.IPTables.SyncPeriod.Duration,
  105. config.IPTables.MinSyncPeriod.Duration,
  106. config.IPTables.MasqueradeAll,
  107. int(*config.IPTables.MasqueradeBit),
  108. config.ClusterCIDR,
  109. hostname,
  110. nodeIP,
  111. recorder,
  112. healthzUpdater,
  113. config.NodePortAddresses,
  114. )
  115. if err != nil {
  116. return nil, fmt.Errorf("unable to create proxier: %v", err)
  117. }
  118. metrics.RegisterMetrics()
  119. //iptables proxier对象和事件处理
  120. proxier = proxierIPTables
  121. serviceEventHandler = proxierIPTables
  122. endpointsEventHandler = proxierIPTables
  123. // No turning back. Remove artifacts that might still exist from the userspace Proxier.
  124. klog.V(0).Info("Tearing down inactive rules.")
  125. //模式转换清理userspace/ipvs模式所创建iptables规则
  126. userspace.CleanupLeftovers(iptInterface)
  127. if canUseIPVS {
  128. ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface, cleanupIPVS)
  129. }
  130. } else if proxyMode == proxyModeIPVS { // proxyMode为"IPVS"
  131. klog.V(0).Info("Using ipvs Proxier.")
  132. //ipvs proxier对象创建
  133. proxierIPVS, err := ipvs.NewProxier(
  134. iptInterface,
  135. ipvsInterface,
  136. ipsetInterface,
  137. utilsysctl.New(),
  138. execer,
  139. config.IPVS.SyncPeriod.Duration,
  140. config.IPVS.MinSyncPeriod.Duration,
  141. config.IPVS.ExcludeCIDRs,
  142. config.IPTables.MasqueradeAll,
  143. int(*config.IPTables.MasqueradeBit),
  144. config.ClusterCIDR,
  145. hostname,
  146. nodeIP,
  147. recorder,
  148. healthzServer,
  149. config.IPVS.Scheduler,
  150. config.NodePortAddresses,
  151. )
  152. if err != nil {
  153. return nil, fmt.Errorf("unable to create proxier: %v", err)
  154. }
  155. metrics.RegisterMetrics()
  156. //ipvs proxier对象和事件处理
  157. proxier = proxierIPVS
  158. serviceEventHandler = proxierIPVS
  159. endpointsEventHandler = proxierIPVS
  160. klog.V(0).Info("Tearing down inactive rules.")
  161. //模式转换清理userspace/iptables模式规则
  162. userspace.CleanupLeftovers(iptInterface)
  163. iptables.CleanupLeftovers(iptInterface)
  164. } else { // proxyMode为"userspace"
  165. klog.V(0).Info("Using userspace Proxier.")
  166. //创建RR模式负载均衡
  167. loadBalancer := userspace.NewLoadBalancerRR()
  168. //设置EndpointsConfigHandler(endpoints事件处理)
  169. endpointsEventHandler = loadBalancer
  170. //创建userspace proxier对象
  171. proxierUserspace, err := userspace.NewProxier(
  172. loadBalancer,
  173. net.ParseIP(config.BindAddress),
  174. iptInterface,
  175. execer,
  176. *utilnet.ParsePortRangeOrDie(config.PortRange),
  177. config.IPTables.SyncPeriod.Duration,
  178. config.IPTables.MinSyncPeriod.Duration,
  179. config.UDPIdleTimeout.Duration,
  180. config.NodePortAddresses,
  181. )
  182. if err != nil {
  183. return nil, fmt.Errorf("unable to create proxier: %v", err)
  184. }
  185. //userspace proxier对象和service事件处理
  186. serviceEventHandler = proxierUserspace
  187. proxier = proxierUserspace
  188. klog.V(0).Info("Tearing down inactive rules.")
  189. //模式转换清理iptables/ipvs模式所创建iptables规则
  190. iptables.CleanupLeftovers(iptInterface)
  191. if canUseIPVS {
  192. ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface, cleanupIPVS)
  193. }
  194. }
  195. //注册reloadfunc为proxier的sync()同步方法
  196. iptInterface.AddReloadFunc(proxier.Sync)
  197. // 构建ProxyServer对象
  198. return &ProxyServer{
  199. Client: client, //apiServer client
  200. EventClient: eventClient, //事件client
  201. IptInterface: iptInterface, //iptables接口
  202. IpvsInterface: ipvsInterface, //ipvs接口
  203. IpsetInterface: ipsetInterface, //ipset接口
  204. execer: execer, //exec命令执行器
  205. Proxier: proxier, //proxier创建对象
  206. Broadcaster: eventBroadcaster, //事件广播器
  207. Recorder: recorder, //事件记录器
  208. ConntrackConfiguration: config.Conntrack, //Conntrack配置
  209. Conntracker: &realConntracker{}, //Conntrack对象
  210. ProxyMode: proxyMode, //proxy模式
  211. NodeRef: nodeRef, //node节点reference信息
  212. MetricsBindAddress: config.MetricsBindAddress, //metric服务地址配置
  213. EnableProfiling: config.EnableProfiling, //debug/pprof配置
  214. OOMScoreAdj: config.OOMScoreAdj, //OOMScoreAdj值配置
  215. ResourceContainer: config.ResourceContainer, //容器资源配置
  216. ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, //同步周期配置
  217. ServiceEventHandler: serviceEventHandler, //处理service事件proxier对象
  218. EndpointsEventHandler: endpointsEventHandler, //处理endpoints事件proxier对象
  219. HealthzServer: healthzServer, //健康检测服务
  220. }, nil
  221. }

Proxy Server运行

!FILENAME cmd/kube-proxy/app/server.go:481

  1. func (s *ProxyServer) Run() error {
  2. klog.Infof("Version: %+v", version.Get())
  3. // 如果CleanupAndExit设置为true,则删除存在的所有iptables规则项,然后应用退出。
  4. if s.CleanupAndExit {
  5. encounteredError := userspace.CleanupLeftovers(s.IptInterface)
  6. encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
  7. encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface, s.CleanupIPVS) || encounteredError
  8. if encounteredError {
  9. return errors.New("encountered an error while tearing down rules.")
  10. }
  11. return nil
  12. }
  13. // 根据启动参数配置"oom-score-adj"分值调整,取值区间[-1000, 1000]
  14. var oomAdjuster *oom.OOMAdjuster
  15. if s.OOMScoreAdj != nil {
  16. oomAdjuster = oom.NewOOMAdjuster()
  17. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
  18. klog.V(2).Info(err)
  19. }
  20. }
  21. //"resource-container"设置是否运行在容器里
  22. if len(s.ResourceContainer) != 0 {
  23. if err := resourcecontainer.RunInResourceContainer(s.ResourceContainer); err != nil {
  24. klog.Warningf("Failed to start in resource-only container %q: %v", s.ResourceContainer, err)
  25. } else {
  26. klog.V(2).Infof("Running in resource-only container %q", s.ResourceContainer)
  27. }
  28. }
  29. //事件广播器
  30. if s.Broadcaster != nil && s.EventClient != nil {
  31. s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
  32. }
  33. // 根据配置启动healthz健康检测服务
  34. if s.HealthzServer != nil {
  35. s.HealthzServer.Run()
  36. }
  37. // 根据配置启动metrics服务。URI: "/proxyMode" 与 "/metrics"
  38. if len(s.MetricsBindAddress) > 0 {
  39. mux := mux.NewPathRecorderMux("kube-proxy")
  40. healthz.InstallHandler(mux)
  41. mux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
  42. fmt.Fprintf(w, "%s", s.ProxyMode)
  43. })
  44. mux.Handle("/metrics", prometheus.Handler())
  45. if s.EnableProfiling {
  46. routes.Profiling{}.Install(mux)
  47. }
  48. configz.InstallHandler(mux)
  49. go wait.Until(func() {
  50. err := http.ListenAndServe(s.MetricsBindAddress, mux)
  51. if err != nil {
  52. utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
  53. }
  54. }, 5*time.Second, wait.NeverStop)
  55. }
  56. // 如果需要(命令选项或配置项)调节conntrack配置值
  57. if s.Conntracker != nil {
  58. max, err := getConntrackMax(s.ConntrackConfiguration)
  59. if err != nil {
  60. return err
  61. }
  62. if max > 0 {
  63. err := s.Conntracker.SetMax(max)
  64. if err != nil {
  65. if err != readOnlySysFSError {
  66. return err
  67. }
  68. const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: " +
  69. "cannot modify conntrack limits, problems may arise later."
  70. s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
  71. }
  72. }
  73. //设置conntracker的TCPEstablishedTimeout
  74. if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
  75. timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
  76. if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
  77. return err
  78. }
  79. }
  80. //设置conntracker的TCPCloseWaitTimeout
  81. if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
  82. timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
  83. if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
  84. return err
  85. }
  86. }
  87. }
  88. // informer机制获取与监听Services和Endpoints的配置与事件信息
  89. // 注册ServiceEventHandler服务事件的处理
  90. // 注册EndpointsEventHandler端点事件的处理
  91. informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
  92. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  93. serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
  94. go serviceConfig.Run(wait.NeverStop)
  95. endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
  96. endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
  97. go endpointsConfig.Run(wait.NeverStop)
  98. go informerFactory.Start(wait.NeverStop)
  99. // "新生儿降生的哭声",作者命名比较有生活情趣^_^
  100. // 服务成功启动,将启动事件广播。
  101. // s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
  102. // nodeRef := &v1.ObjectReference{
  103. // Kind: "Node",
  104. // Name: hostname,
  105. // UID: types.UID(hostname),
  106. // Namespace: "",
  107. // }
  108. s.birthCry()
  109. // Proxier(代理服务提供者)进行循环配置同步与处理proxy逻辑(本文聚焦应用主框架,后有专篇分析Proxier)
  110. // 此时将进入了proxier运行,默认使用的iptables模式的proxier对象
  111. s.Proxier.SyncLoop()
  112. return nil
  113. }

s.Proxier.SyncLoop()的运行将进入kube-proxy第三层(service实现机制层),Proxier实例化对象是在proxy server对象创建时通过config配置文件或”-proxy-mode”指定(userspace / iptables / ipvs模式),而默认使用的iptables模式proxier对象。第三层的代码分析将针对三种模式设立专篇分析,此处为关键部分请记住此处,在后面proxier的分析文章重点关注。

在第二层的框架层我们还须关注kube-proxy与kubernetes集群同步信息的机制informer。kube-proxy组件在proxy server的run()运行创建service、endpoints的informer对其list同步数据和watch监听事件(add、delete、update),调用注册的proxier handler进行处理(第三层proxier的同步处理机制调用与触发)。

同步规则机制(Informer)

kube-proxy同样使用client-go标准的ApiServer同步方式,创建informer,注册事件处理器handler,持续监控watch事件并调用handler处理事件add/update/delete,后端处理则由proxier(userspace/iptables/ipvs)实现。因为endpoints的同步与service同步方式一致,则下面仅说明service代码实现。

!FILENAME pkg/proxy/config/config.go:174

  1. func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
  2. result := &ServiceConfig{
  3. lister: serviceInformer.Lister(), //监听器
  4. listerSynced: serviceInformer.Informer().HasSynced, //监听器同步状态值
  5. }
  6. //在服务informer上添加了资源事件的处理器handleFunc。(Add/update/delete)
  7. serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
  8. cache.ResourceEventHandlerFuncs{
  9. AddFunc: result.handleAddService,
  10. UpdateFunc: result.handleUpdateService,
  11. DeleteFunc: result.handleDeleteService,
  12. },
  13. resyncPeriod,
  14. )
  15. return result
  16. }

!FILENAME pkg/proxy/config/config.go:119

  1. func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
  2. defer utilruntime.HandleCrash()
  3. klog.Info("Starting service config controller")
  4. defer klog.Info("Shutting down service config controller")
  5. if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
  6. return
  7. }
  8. for i := range c.eventHandlers {
  9. klog.V(3).Info("Calling handler.OnServiceSynced()")
  10. c.eventHandlers[i].OnServiceSynced() //服务与proxier处理同步
  11. }
  12. <-stopCh
  13. }

HandleAddService()新增事件处理

!FILENAME pkg/proxy/config/config.go:217

  1. func (c *ServiceConfig) handleAddService(obj interface{}) {
  2. service, ok := obj.(*v1.Service)
  3. if !ok {
  4. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  5. return
  6. }
  7. for i := range c.eventHandlers {
  8. klog.V(4).Info("Calling handler.OnServiceAdd")
  9. c.eventHandlers[i].OnServiceAdd(service) //服务新增事件与proxier处理同步
  10. }
  11. }

HandleUpdateService()更新事件处理

!FILENAME pkg/proxy/config/config.go:229

  1. func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
  2. oldService, ok := oldObj.(*v1.Service)
  3. if !ok {
  4. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
  5. return
  6. }
  7. service, ok := newObj.(*v1.Service)
  8. if !ok {
  9. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
  10. return
  11. }
  12. for i := range c.eventHandlers {
  13. klog.V(4).Info("Calling handler.OnServiceUpdate")
  14. c.eventHandlers[i].OnServiceUpdate(oldService, service) //服务更新事件与proxier处理同步
  15. }
  16. }

HandleDeleteService()删除事件处理

!FILENAME pkg/proxy/config/config.go:246

  1. func (c *ServiceConfig) handleDeleteService(obj interface{}) {
  2. service, ok := obj.(*v1.Service)
  3. if !ok {
  4. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  5. if !ok {
  6. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  7. return
  8. }
  9. if service, ok = tombstone.Obj.(*v1.Service); !ok {
  10. utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
  11. return
  12. }
  13. }
  14. for i := range c.eventHandlers {
  15. klog.V(4).Info("Calling handler.OnServiceDelete")
  16. c.eventHandlers[i].OnServiceDelete(service) //服务删除事件与proxier处理同步
  17. }
  18. }

第三层proxier分析,请参看iptables、ipvs、userspace-mode proxier分析文档。

〜本文END〜