上篇文章 kubernetes service 原理解析 已经分析了 service 原理以 kube-proxy 中三种模式的原理,本篇文章会从源码角度分析 kube-proxy 的设计与实现。

kubernetes 版本: v1.16

kube-proxy 启动流程

前面的文章已经说过 kubernetes 中所有组件都是通过其 run() 方法启动主逻辑的,run() 方法调用之前会进行解析命令行参数、添加默认值等。下面就直接看 kube-proxy 的 run() 方法:

  • 若启动时指定了 --write-config-to 参数,kube-proxy 只将启动的默认参数写到指定的配置文件中,然后退出
  • 初始化 ProxyServer 对象
  • 如果启动参数 --cleanup 设置为 true,则清理 iptables 和 ipvs 规则并退出

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:290

  1. func (o *Options) Run() error {
  2. defer close(o.errCh)
  3. // 1.如果指定了 --write-config-to 参数,则将默认的配置文件写到指定文件并退出
  4. if len(o.WriteConfigTo) > 0 {
  5. return o.writeConfigFile()
  6. }
  7. // 2.初始化 ProxyServer 对象
  8. proxyServer, err := NewProxyServer(o)
  9. if err != nil {
  10. return err
  11. }
  12. // 3.如果启动参数 --cleanup 设置为 true,则清理 iptables 和 ipvs 规则并退出
  13. if o.CleanupAndExit {
  14. return proxyServer.CleanupAndExit()
  15. }
  16. o.proxyServer = proxyServer
  17. return o.runLoop()
  18. }

Run() 方法中主要调用了 NewProxyServer() 方法来初始化 ProxyServer,然后会调用 runLoop() 启动主循环,继续看初始化 ProxyServer 的具体实现:

  • 初始化 iptables、ipvs 相关的 interface
  • 若启用了 ipvs 则检查内核版本、ipvs 依赖的内核模块、ipset 版本,内核模块主要包括:ip_vsip_vs_rr,ip_vs_wrr,ip_vs_sh,nf_conntrack_ipv4,nf_conntrack,若没有相关模块,kube-proxy 会尝试使用 modprobe 自动加载
  • 根据 proxyMode 初始化 proxier,kube-proxy 启动后只运行一种 proxier

k8s.io/kubernetes/cmd/kube-proxy/app/server_others.go:57

  1. func NewProxyServer(o *Options) (*ProxyServer, error) {
  2. return newProxyServer(o.config, o.CleanupAndExit, o.master)
  3. }
  4. func newProxyServer(
  5. config *proxyconfigapi.KubeProxyConfiguration,
  6. cleanupAndExit bool,
  7. master string) (*ProxyServer, error) {
  8. ......
  9. if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
  10. c.Set(config)
  11. } else {
  12. return nil, fmt.Errorf("unable to register configz: %s", err)
  13. }
  14. ......
  15. // 1.关键依赖工具 iptables/ipvs/ipset/dbus
  16. var iptInterface utiliptables.Interface
  17. var ipvsInterface utilipvs.Interface
  18. var kernelHandler ipvs.KernelHandler
  19. var ipsetInterface utilipset.Interface
  20. var dbus utildbus.Interface
  21. // 2.执行 linux 命令行的工具
  22. execer := exec.New()
  23. // 3.初始化 iptables/ipvs/ipset/dbus 对象
  24. dbus = utildbus.New()
  25. iptInterface = utiliptables.New(execer, dbus, protocol)
  26. kernelHandler = ipvs.NewLinuxKernelHandler()
  27. ipsetInterface = utilipset.New(execer)
  28. // 4.检查该机器是否支持使用 ipvs 模式
  29. canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
  30. if canUseIPVS {
  31. ipvsInterface = utilipvs.New(execer)
  32. }
  33. if cleanupAndExit {
  34. return &ProxyServer{
  35. ......
  36. }, nil
  37. }
  38. // 5.初始化 kube client 和 event client
  39. client, eventClient, err := createClients(config.ClientConnection, master)
  40. if err != nil {
  41. return nil, err
  42. }
  43. ......
  44. // 6.初始化 healthzServer
  45. var healthzServer *healthcheck.HealthzServer
  46. var healthzUpdater healthcheck.HealthzUpdater
  47. if len(config.HealthzBindAddress) > 0 {
  48. healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
  49. healthzUpdater = healthzServer
  50. }
  51. // 7.proxier 是一个 interface,每种模式都是一个 proxier
  52. var proxier proxy.Provider
  53. // 8.根据 proxyMode 初始化 proxier
  54. proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
  55. ......
  56. if proxyMode == proxyModeIPTables {
  57. klog.V(0).Info("Using iptables Proxier.")
  58. if config.IPTables.MasqueradeBit == nil {
  59. return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
  60. }
  61. // 9.初始化 iptables 模式的 proxier
  62. proxier, err = iptables.NewProxier(
  63. .......
  64. )
  65. if err != nil {
  66. return nil, fmt.Errorf("unable to create proxier: %v", err)
  67. }
  68. metrics.RegisterMetrics()
  69. } else if proxyMode == proxyModeIPVS {
  70. // 10.判断是够启用了 ipv6 双栈
  71. if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
  72. ......
  73. // 11.初始化 ipvs 模式的 proxier
  74. proxier, err = ipvs.NewDualStackProxier(
  75. ......
  76. )
  77. } else {
  78. proxier, err = ipvs.NewProxier(
  79. ......
  80. )
  81. }
  82. if err != nil {
  83. return nil, fmt.Errorf("unable to create proxier: %v", err)
  84. }
  85. metrics.RegisterMetrics()
  86. } else {
  87. // 12.初始化 userspace 模式的 proxier
  88. proxier, err = userspace.NewProxier(
  89. ......
  90. )
  91. if err != nil {
  92. return nil, fmt.Errorf("unable to create proxier: %v", err)
  93. }
  94. }
  95. iptInterface.AddReloadFunc(proxier.Sync)
  96. return &ProxyServer{
  97. ......
  98. }, nil
  99. }

runLoop() 方法主要是启动 proxyServer。

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:311

  1. func (o *Options) runLoop() error {
  2. // 1.watch 配置文件变化
  3. if o.watcher != nil {
  4. o.watcher.Run()
  5. }
  6. // 2.以 goroutine 方式启动 proxyServer
  7. go func() {
  8. err := o.proxyServer.Run()
  9. o.errCh <- err
  10. }()
  11. for {
  12. err := <-o.errCh
  13. if err != nil {
  14. return err
  15. }
  16. }
  17. }

o.proxyServer.Run() 中会启动已经初始化好的所有服务:

  • 设定进程 OOMScore,可通过命令行配置,默认值为 --oom-score-adj="-999"
  • 启动 metric server 和 healthz server,两者分别监听 10256 和 10249 端口
  • 设置内核参数 nf_conntrack_tcp_timeout_establishednf_conntrack_tcp_timeout_close_wait
  • 将 proxier 注册到 serviceEventHandler、endpointsEventHandler 中
  • 启动 informer 监听 service 和 endpoints 变化
  • 执行 s.Proxier.SyncLoop(),启动 proxier 主循环

k8s.io/kubernetes/cmd/kube-proxy/app/server.go:527

  1. func (s *ProxyServer) Run() error {
  2. ......
  3. // 1.进程 OOMScore,避免进程因 oom 被杀掉,此处默认值为 -999
  4. var oomAdjuster *oom.OOMAdjuster
  5. if s.OOMScoreAdj != nil {
  6. oomAdjuster = oom.NewOOMAdjuster()
  7. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
  8. klog.V(2).Info(err)
  9. }
  10. }
  11. ......
  12. // 2.启动 healthz server
  13. if s.HealthzServer != nil {
  14. s.HealthzServer.Run()
  15. }
  16. // 3.启动 metrics server
  17. if len(s.MetricsBindAddress) > 0 {
  18. ......
  19. go wait.Until(func() {
  20. err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
  21. if err != nil {
  22. utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
  23. }
  24. }, 5*time.Second, wait.NeverStop)
  25. }
  26. // 4.配置 conntrack,设置内核参数 nf_conntrack_tcp_timeout_established 和 nf_conntrack_tcp_timeout_close_wait
  27. if s.Conntracker != nil {
  28. max, err := getConntrackMax(s.ConntrackConfiguration)
  29. if err != nil {
  30. return err
  31. }
  32. if max > 0 {
  33. err := s.Conntracker.SetMax(max)
  34. ......
  35. }
  36. if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
  37. timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
  38. if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
  39. return err
  40. }
  41. }
  42. if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
  43. timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
  44. if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
  45. return err
  46. }
  47. }
  48. }
  49. ......
  50. // 5.启动 informer 监听 Services 和 Endpoints 或者 EndpointSlices 信息
  51. informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
  52. informers.WithTweakListOptions(func(options *metav1.ListOptions) {
  53. options.LabelSelector = labelSelector.String()
  54. }))
  55. // 6.将 proxier 注册到 serviceConfig、endpointsConfig 中
  56. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  57. serviceConfig.RegisterEventHandler(s.Proxier)
  58. go serviceConfig.Run(wait.NeverStop)
  59. if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
  60. endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
  61. endpointSliceConfig.RegisterEventHandler(s.Proxier)
  62. go endpointSliceConfig.Run(wait.NeverStop)
  63. } else {
  64. endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
  65. endpointsConfig.RegisterEventHandler(s.Proxier)
  66. go endpointsConfig.Run(wait.NeverStop)
  67. }
  68. // 7.启动 informer
  69. informerFactory.Start(wait.NeverStop)
  70. s.birthCry()
  71. // 8.启动 proxier 主循环
  72. s.Proxier.SyncLoop()
  73. return nil
  74. }

回顾一下整个启动逻辑:

  1. o.Run() --> o.runLoop() --> o.proxyServer.Run() --> s.Proxier.SyncLoop()

o.Run() 中调用了 NewProxyServer() 来初始化 proxyServer 对象,其中包括初始化每种模式对应的 proxier,该方法最终会调用 s.Proxier.SyncLoop() 执行 proxier 的主循环。

proxier 的初始化

看完了启动流程的逻辑代码,接着再看一下各代理模式的初始化,上文已经提到每种模式都是一个 proxier,即要实现 proxy.Provider 对应的 interface,如下所示:

  1. type Provider interface {
  2. config.EndpointsHandler
  3. config.EndpointSliceHandler
  4. config.ServiceHandler
  5. Sync()
  6. SyncLoop()
  7. }

首先要实现 service、endpoints 和 endpointSlice 对应的 handler,也就是对 OnAddOnUpdateOnDeleteOnSynced 四种方法的处理,详细的代码在下文进行讲解。EndpointSlice 是在 v1.16 中新加入的一个 API。Sync()SyncLoop() 是主要用来处理iptables 规则的方法。

iptables proxier 初始化

首先看 iptables 模式的 NewProxier()方法,其函数的具体执行逻辑为:

  • 设置相关的内核参数route_localnetbridge-nf-call-iptables
  • 生成 masquerade 标记
  • 设置默认调度算法 rr
  • 初始化 proxier 对象
  • 使用 BoundedFrequencyRunner 初始化 proxier.syncRunner,将 proxier.syncProxyRules 方法注入,BoundedFrequencyRunner 是一个管理器用于执行用户注入的函数,可以指定运行的时间策略。

k8s.io/kubernetes/pkg/proxy/iptables/proxier.go:249

  1. func NewProxier(ipt utiliptables.Interface,
  2. ......
  3. ) (*Proxier, error) {
  4. // 1.设置相关的内核参数
  5. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  6. ......
  7. }
  8. if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
  9. ......
  10. }
  11. // 2.设置 masqueradeMark,默认为 0x00004000/0x00004000
  12. // 用来标记 k8s 管理的报文,masqueradeBit 默认为 14
  13. // 标记 0x4000 的报文(即 POD 发出的报文),在离开 Node 的时候需要进行 SNAT 转换
  14. masqueradeValue := 1 << uint(masqueradeBit)
  15. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  16. ......
  17. endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
  18. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil)
  19. // 3.初始化 proxier
  20. isIPv6 := ipt.IsIpv6()
  21. proxier := &Proxier{
  22. ......
  23. }
  24. burstSyncs := 2
  25. // 4.初始化 syncRunner,BoundedFrequencyRunner 是一个定时执行器,会定时执行
  26. // proxier.syncProxyRules 方法,syncProxyRules 是每个 proxier 实际刷新iptables 规则的方法
  27. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  28. return proxier, nil
  29. }

ipvs proxier 初始化

ipvs NewProxier() 方法主要逻辑为:

  • 设定内核参数,route_localnetbr_netfilterbridge-nf-call-iptablesconntrackconn_reuse_modeip_forwardarp_ignorearp_announce
  • 和 iptables 一样,对于 SNAT iptables 规则生成 masquerade 标记
  • 设置默认调度算法 rr
  • 初始化 proxier 对象
  • 初始化 ipset 规则
  • 初始化 syncRunner 将 proxier.syncProxyRules 方法注入
  • 启动 gracefuldeleteManager 定时清理 RS (realServer) 记录

k8s.io/kubernetes/pkg/proxy/ipvs/proxier.go:316

  1. func NewProxier(ipt utiliptables.Interface,
  2. ......
  3. ) (*Proxier, error) {
  4. // 1.设定内核参数
  5. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  6. ......
  7. }
  8. ......
  9. // 2.生成 masquerade 标记
  10. masqueradeValue := 1 << uint(masqueradeBit)
  11. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  12. // 3.设置默认调度算法 rr
  13. if len(scheduler) == 0 {
  14. scheduler = DefaultScheduler
  15. }
  16. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
  17. endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
  18. // 4.初始化 proxier
  19. proxier := &Proxier{
  20. ......
  21. }
  22. // 5.初始化 ipset 规则
  23. proxier.ipsetList = make(map[string]*IPSet)
  24. for _, is := range ipsetInfo {
  25. proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
  26. }
  27. burstSyncs := 2
  28. // 6.初始化 syncRunner
  29. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  30. // 7.启动 gracefuldeleteManager
  31. proxier.gracefuldeleteManager.Run()
  32. return proxier, nil
  33. }

userspace proxier 初始化

userspace NewProxier() 方法主要逻辑为:

  • 初始化 iptables 规则
  • 初始化 proxier
  • 初始化 syncRunner 将 proxier.syncProxyRules 方法注入

k8s.io/kubernetes/pkg/proxy/userspace/proxier.go:187

  1. func NewProxier(......) (*Proxier, error) {
  2. return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, nodePortAddresses, newProxySocket)
  3. }
  4. func NewCustomProxier(......) (*Proxier, error) {
  5. ......
  6. // 1.设置打开文件数
  7. err = setRLimit(64 * 1000)
  8. if err != nil {
  9. return nil, fmt.Errorf("failed to set open file handler limit: %v", err)
  10. }
  11. proxyPorts := newPortAllocator(pr)
  12. return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
  13. }
  14. func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
  15. if proxyPorts == nil {
  16. proxyPorts = newPortAllocator(utilnet.PortRange{})
  17. }
  18. // 2.初始化 iptables 规则
  19. if err := iptablesInit(iptables); err != nil {
  20. return nil, fmt.Errorf("failed to initialize iptables: %v", err)
  21. }
  22. if err := iptablesFlush(iptables); err != nil {
  23. return nil, fmt.Errorf("failed to flush iptables: %v", err)
  24. }
  25. // 3.初始化 proxier
  26. proxier := &Proxier{
  27. ......
  28. }
  29. // 4.初始化 syncRunner
  30. proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
  31. return proxier, nil
  32. }

proxier 接口实现

handler 的实现

上文已经提到过每种 proxier 都需要实现 interface 中的几个方法,首先看一下 ServiceHandlerEndpointsHandlerEndpointSliceHandler 相关的,对于 service、endpoints 和 endpointSlices 三种对象都实现了 OnAddOnUpdateOnDeleteOnSynced 方法。

  1. // 1.service 相关的方法
  2. func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  3. proxier.OnServiceUpdate(nil, service)
  4. }
  5. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  6. if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
  7. proxier.syncRunner.Run()
  8. }
  9. }
  10. func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  11. proxier.OnServiceUpdate(service, nil)
  12. }
  13. func (proxier *Proxier) OnServiceSynced(){
  14. ......
  15. proxier.syncProxyRules()
  16. }
  17. // 2.endpoints 相关的方法
  18. func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
  19. proxier.OnEndpointsUpdate(nil, endpoints)
  20. }
  21. func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
  22. if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
  23. proxier.Sync()
  24. }
  25. }
  26. func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
  27. proxier.OnEndpointsUpdate(endpoints, nil)
  28. }
  29. func (proxier *Proxier) OnEndpointsSynced() {
  30. ......
  31. proxier.syncProxyRules()
  32. }
  33. // 3.endpointSlice 相关的方法
  34. func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
  35. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  36. proxier.Sync()
  37. }
  38. }
  39. func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
  40. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
  41. proxier.Sync()
  42. }
  43. }
  44. func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
  45. if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
  46. proxier.Sync()
  47. }
  48. }
  49. func (proxier *Proxier) OnEndpointSlicesSynced() {
  50. ......
  51. proxier.syncProxyRules()
  52. }

在启动逻辑的 Run() 方法中 proxier 已经被注册到了 serviceConfig、endpointsConfig、endpointSliceConfig 中,当启动 informer,cache 同步完成后会调用 OnSynced() 方法,之后当 watch 到变化后会调用 proxier 中对应的 OnUpdate() 方法进行处理,OnSynced() 会直接调用 proxier.syncProxyRules() 来刷新iptables 规则,而 OnUpdate() 会调用 proxier.syncRunner.Run() 方法,其最终也是调用 proxier.syncProxyRules() 方法刷新规则的,这种转换是在 BoundedFrequencyRunner 中体现出来的,下面看一下具体实现。

Sync() 以及 SyncLoop() 的实现

每种 proxier 的 Sync() 以及 SyncLoop() 方法如下所示,都是调用 syncRunner 中的相关方法,而 syncRunner 在前面的 NewProxier() 中已经说过了,syncRunner 是调用 async.NewBoundedFrequencyRunner() 方法初始化,至此,基本上可以确定了所有的核心都是在 BoundedFrequencyRunner 中实现的。

  1. func NewProxier() (*Proxier, error) {
  2. ......
  3. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  4. ......
  5. }
  6. // Sync()
  7. func (proxier *Proxier) Sync() {
  8. proxier.syncRunner.Run()
  9. }
  10. // SyncLoop()
  11. func (proxier *Proxier) SyncLoop() {
  12. if proxier.healthzServer != nil {
  13. proxier.healthzServer.UpdateTimestamp()
  14. }
  15. proxier.syncRunner.Loop(wait.NeverStop)
  16. }

NewBoundedFrequencyRunner()是其初始化的函数,其中的参数 minIntervalmaxInterval 分别对应 proxier 中的 minSyncPeriodsyncPeriod,两者的默认值分别为 0s 和 30s,其值可以使用 --iptables-min-sync-period--iptables-sync-period 启动参数来指定。

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:134

  1. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
  2. timer := realTimer{Timer: time.NewTimer(0)}
  3. // 执行定时器
  4. <-timer.C()
  5. // 调用 construct() 函数
  6. return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
  7. }
  8. func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
  9. if maxInterval < minInterval {
  10. panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
  11. }
  12. if timer == nil {
  13. panic(fmt.Sprintf("%s: timer must be non-nil", name))
  14. }
  15. bfr := &BoundedFrequencyRunner{
  16. name: name,
  17. fn: fn, // 被调用的函数,proxier.syncProxyRules
  18. minInterval: minInterval,
  19. maxInterval: maxInterval,
  20. run: make(chan struct{}, 1),
  21. timer: timer,
  22. }
  23. // 由于默认的 minInterval = 0,此处使用 nullLimiter
  24. if minInterval == 0 {
  25. bfr.limiter = nullLimiter{}
  26. } else {
  27. // 采用“令牌桶”算法实现流控机制
  28. qps := float32(time.Second) / float32(minInterval)
  29. bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
  30. }
  31. return bfr
  32. }

在启动流程 Run() 方法最后调用的 s.Proxier.SyncLoop() 最终调用的是 BoundedFrequencyRunnerLoop()方法,如下所示:

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:169

  1. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  2. bfr.timer.Reset(bfr.maxInterval)
  3. for {
  4. select {
  5. case <-stop:
  6. bfr.stop()
  7. return
  8. case <-bfr.timer.C(): // 定时器
  9. bfr.tryRun()
  10. case <-bfr.run: // 接收 channel
  11. bfr.tryRun()
  12. }
  13. }
  14. }

proxier 的 OnUpdate() 中调用的 syncRunner.Run() 其实只是在 bfr.run 这个带 buffer 的 channel 中发送了一条数据,在 BoundedFrequencyRunnerLoop()方法中接收到该数据后会调用 bfr.tryRun() 进行处理:

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:191

  1. func (bfr *BoundedFrequencyRunner) Run() {
  2. select {
  3. case bfr.run <- struct{}{}: // 向 channel 发送信号
  4. default:
  5. }
  6. }

tryRun() 方法才是最终调用 syncProxyRules() 刷新iptables 规则的。

k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go:211

  1. func (bfr *BoundedFrequencyRunner) tryRun() {
  2. bfr.mu.Lock()
  3. defer bfr.mu.Unlock()
  4. if bfr.limiter.TryAccept() {
  5. // 执行 fn() 即 syncProxyRules() 刷新iptables 规则
  6. bfr.fn()
  7. bfr.lastRun = bfr.timer.Now()
  8. bfr.timer.Stop()
  9. bfr.timer.Reset(bfr.maxInterval)
  10. return
  11. }
  12. elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
  13. nextPossible := bfr.minInterval - elapsed // time to next possible run
  14. nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
  15. if nextPossible < nextScheduled {
  16. bfr.timer.Stop()
  17. bfr.timer.Reset(nextPossible)
  18. }
  19. }

通过以上分析可知,syncProxyRules() 是每个 proxier 的核心方法,启动 informer cache 同步完成后会直接调用 proxier.syncProxyRules() 刷新iptables 规则,之后如果 informer watch 到相关对象的变化后会调用 BoundedFrequencyRunnertryRun()来刷新iptables 规则,定时器每 30s 会执行一次iptables 规则的刷新。

总结

本文主要介绍了 kube-proxy 的启动逻辑以及三种模式 proxier 的初始化,还有最终调用刷新iptables 规则的 BoundedFrequencyRunner,可以看到其中的代码写的很巧妙。而每种模式下的iptables 规则是如何创建、刷新以及转发的是如何实现的会在后面的文章中进行分析。