Ipvs-mode proxier

概述

关于ipvs-mode proxier基础知识可参看官方文档(英文版中文版),其官方文档主要介绍以下几方面内容:

  1. ipvs技术简介和对比iptables-mode所带来的好处;
  2. ipvs-mode proxier按用户配置不同所生成的用户层iptables规则示例(masquerade-all/cluster-cidr/Load Balancer/NodePort/externalIPs);
  3. 如何kube-proxy运行ipvs模式、运行必要条件、运行debug和排错操作。

本文分析将聚焦在代码层的实现解析 (如运行时必要条件检测的代码实现是怎么样的?ipvs为实现service层是如何实现的?iptables规则代码是怎样的?proxier完整的实现逻辑与方式是怎样?等等) 。

ipvs proxy模式主要依赖几个底层技术如 ipvs/ipset /iptables/netlink(用户空间与内核空间异步通信机制),有必要预先对其基础用途或技术细节进行扩展知识的熟悉,将有助于对整个ipvs-mode proxier的实现更深层次的理解。

Ipvs-mode proxier使用ipvs NAT模式实现,ipvs集群操作(如虚拟服务器、RealServer)是通过netlink内核通迅创建标准的协议格式通迅消息体进行交互实现。 Ipvs-mode proxier也同样使用了iptables固定模板规则结合ipset集来进行动态管理变化更新。

Ipvs-mode proxier整个代码机制逻辑与iptables-mode一致(参看iptable-mode代码逻辑示意图)。同样是通过同步apiserver事件及更新信息,生成相应的路由规则。但ipvs-mode服务规则不同于iptables-mode,不仅使用了ipset扩展的方式简化iptables规则条目和优化性能,而且还使用ipvs技术实现更丰富的集群负载策略管理。其规则生成操作须对ipset集、iptables规则、ipvs集群进行同步更新操作,关键逻辑代码在syncProxyRules()内。

Ipvs-mode proxer 对象创建与初始化

ProxyServer实例化时初始化了proxier模式,如果代理模式指定为Ipvs,则创建proxier对象,且指定其service与endpoints的事件处理器。

!FILENAME cmd/kube-proxy/app/server_others.go:59

  1. func newProxyServer(...) (*ProxyServer, error) {
  2. //...
  3. else if proxyMode == proxyModeIPVS { //当proxy模式指定为IPVS模式(命令参数或配置文件)
  4. klog.V(0).Info("Using ipvs Proxier.")
  5. proxierIPVS, err := ipvs.NewProxier( //创建ipvs-mode proxier对象
  6. //...
  7. )
  8. // porxyServer的proxier对象与事件处理器的指定
  9. proxier = proxierIPVS
  10. serviceEventHandler = proxierIPVS
  11. endpointsEventHandler = proxierIPVS
  12. //...
  13. }

ipvs-mode proxier对象实例化NewProxier(),对ipvs环境进行初始化

相关内核参数调整说明:

  • net/ipv4/conf/all/route_localnet 是否允许外部访问localhost;
  • net/bridge/bridge-nf-call-iptables 为二层的网桥在转发包时也会被iptables的FORWARD规则所过滤,这样就会出现L3层的iptables rules去过滤L2的帧的问题;
  • net/ipv4/vs/conntrack 开启NFCT(Netfilter connection tracking连接与状态跟踪);
  • net/ipv4/vs/conn_reuse_mode 网络连接复用模式的选择;
  • net/ipv4/vs/expire_nodest_conn 值为0,当LVS转发数据包,发现目的RS无效(删除)时,会丢弃该数据包,但不删除相应连接。值为1时,则马上释放相应连接;
  • net/ipv4/vs/expire_quiescent_template 值为0,当RS的weight值=0(如,健康检测失败,应用程序将RS weight置0)时,会话保持的新建连接 还会继续调度到该RS上;值为1,则马上将会话保持的连接模板置为无效,重新调度新的RS。如果有会话保持的业务,建议该值配置为1;
  • net/ipv4/ip_forward 是否打开ipv4的IP转发模式;
  • net/ipv4/conf/all/arp_ignore 定义对目标地址为本地IP的ARP询问不同的应答模式(0~8),模式1表示:只回答目标IP地址是来访网络接口本地地址的ARP查询请求;
  • net/ipv4/conf/all/arp_announce 对网络接口上,本地IP地址的发出的,ARP回应,作出相应级别的限制;值为2表示:对查询目标使用最适当的本地地址;

!FILENAME pkg/proxy/ipvs/proxier.go:280

  1. func NewProxier(...) (*Proxier, error) {
  2. // sysctl配置项 "net/ipv4/conf/all/route_localnet" 值为1
  3. if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
  4. if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
  5. return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
  6. }
  7. }
  8. //...
  9. // sysctl配置项 "net/bridge/bridge-nf-call-iptables" 值为1
  10. sysctl.GetSysctl(sysctlBridgeCallIPTables)
  11. // sysctl配置项 "net/ipv4/vs/conntrack" 值为1
  12. sysctl.SetSysctl(sysctlVSConnTrack, 1)
  13. // sysctl配置项 "net/ipv4/vs/conn_reuse_mode" 值为0
  14. sysctl.SetSysctl(sysctlConnReuse, 0)
  15. // sysctl配置项 "net/ipv4/vs/expire_nodest_conn" 值为1
  16. sysctl.SetSysctl(sysctlExpireNoDestConn, 1)
  17. // sysctl配置项 "net/ipv4/vs/expire_quiescent_template" 值为1
  18. sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1)
  19. // sysctl配置项 "net/ipv4/ip_forward" 值为1
  20. sysctl.SetSysctl(sysctlForward, 1)
  21. // sysctl配置项 "net/ipv4/conf/all/arp_ignore" 值为1
  22. sysctl.SetSysctl(sysctlArpIgnore, 1)
  23. // sysctl配置项 "net/ipv4/conf/all/arp_announce" 值为2
  24. sysctl.SetSysctl(sysctlArpAnnounce, 2)
  25. //...
  26. // 生成masquerade标志用于SNAT规则
  27. masqueradeValue := 1 << uint(masqueradeBit)
  28. masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
  29. // node ip检测
  30. if nodeIP == nil {
  31. klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
  32. nodeIP = net.ParseIP("127.0.0.1")
  33. }
  34. isIPv6 := utilnet.IsIPv6(nodeIP)
  35. klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
  36. // 检测是否有为proxier配置clusterCIDR参数
  37. // clusterCIDR指定集群中pod使用的网段,以此来区分内部与外部流量
  38. if len(clusterCIDR) == 0 {
  39. klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
  40. } else if utilnet.IsIPv6CIDR(clusterCIDR) != isIPv6 {
  41. return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6)
  42. }
  43. // 检测是否指定了proxy调度器scheduler算法,如果未指定,则为默认"RR"平均负载算法
  44. if len(scheduler) == 0 {
  45. klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
  46. scheduler = DefaultScheduler
  47. }
  48. // healthcheck服务器对象创建
  49. healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil)
  50. // 创建Proxier对象
  51. proxier := &Proxier{
  52. //更新SVC、EP信息存放map和changeTracker
  53. portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
  54. serviceMap: make(proxy.ServiceMap),
  55. serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
  56. endpointsMap: make(proxy.EndpointsMap),
  57. endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
  58. //同步周期
  59. syncPeriod: syncPeriod,
  60. minSyncPeriod: minSyncPeriod,
  61. excludeCIDRs: excludeCIDRs,
  62. iptables: ipt, //iptables执行处理器
  63. masqueradeAll: masqueradeAll, //伪装所有访问Service的ClusterIP流量
  64. masqueradeMark: masqueradeMark, //伪装标志号
  65. exec: exec, // osExec命令执行器
  66. clusterCIDR: clusterCIDR,
  67. hostname: hostname,
  68. nodeIP: nodeIP,
  69. portMapper: &listenPortOpener{},
  70. recorder: recorder,
  71. healthChecker: healthChecker,
  72. healthzServer: healthzServer,
  73. ipvs: ipvs, //ipvs接口
  74. ipvsScheduler: scheduler, //集群调度算法(默认RR)
  75. ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, //node ip获取器
  76. //iptables规则数据存放buffer
  77. iptablesData: bytes.NewBuffer(nil),
  78. filterChainsData: bytes.NewBuffer(nil),
  79. natChains: bytes.NewBuffer(nil),
  80. natRules: bytes.NewBuffer(nil),
  81. filterChains: bytes.NewBuffer(nil),
  82. filterRules: bytes.NewBuffer(nil),
  83. netlinkHandle: NewNetLinkHandle(), //netlink执行处理器
  84. ipset: ipset, //ipset执行处理器
  85. nodePortAddresses: nodePortAddresses,
  86. networkInterfacer: utilproxy.RealNetwork{},
  87. gracefuldeleteManager: NewGracefulTerminationManager(ipvs), // RS清理管理器
  88. }
  89. // 遍历ipsetInfo定义,初始化kubernetes ipset默认集。(后面在ipset默认集创建时有详细介绍)
  90. proxier.ipsetList = make(map[string]*IPSet)
  91. for _, is := range ipsetInfo {
  92. proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
  93. }
  94. burstSyncs := 2
  95. klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
  96. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) //同步runner
  97. proxier.gracefuldeleteManager.Run() //后台线程定时(/分钟)清理RS(realServer记录)
  98. return proxier, nil
  99. }

Proxier 服务与端点更新机制

ipvs模式和iptables模式的service和endpoints更新变化信息同步机制是一致的(更详细说明可参考iptables-mode proxier文章),但为了本文的完整性和相对独立性,这里我们也简单的过一下部分代码。

在构建ipvs-mode proxier对象时指定同步运行器async.NewBoundedFrequencyRunner,同步proxy的规则处理则是syncProxyRules()。同样ipvs-proxier类对象有两个属性对象:serviceChanges(ServiceChangeTracker)和endpointsChanges(EndpointChangeTracker)是就是用来跟踪并记录service和endpoints的变化信息更新至相应的两个属性Items map(serviceChange和endpointsChange)。

!FILENAME pkg/proxy/ipvs/proxier.go:429

  1. proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

在框架层第二层proxy server的运行时最后的调用就是”s.Proxier.SyncLoop()”

!FILENAME pkg/proxy/ipvs/proxier.go:631

  1. func (proxier *Proxier) SyncLoop() {
  2. // Update healthz timestamp at beginning in case Sync() never succeeds.
  3. // ...
  4. proxier.syncRunner.Loop(wait.NeverStop) //执行NewBoundedFrequencyRunner对象Loop
  5. }

!FILENAME pkg/util/async/bounded_frequency_runner.go:169

  1. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  2. bfr.timer.Reset(bfr.maxInterval)
  3. for {
  4. select {
  5. case <-stop:
  6. bfr.stop()
  7. return
  8. case <-bfr.timer.C(): //定时器方式执行
  9. bfr.tryRun()
  10. case <-bfr.run: //按需方式执行(发送运行指令信号)
  11. bfr.tryRun()
  12. }
  13. }
  14. }

BoundedFrequencyRunner.tryRun() 按指定频率执行回调函数func “bfr.fn()”

!FILENAME pkg/util/async/bounded_frequency_runner.go:211

  1. func (bfr *BoundedFrequencyRunner) tryRun() {
  2. bfr.mu.Lock()
  3. defer bfr.mu.Unlock()
  4. //限制条件允许运行func
  5. if bfr.limiter.TryAccept() {
  6. bfr.fn() // 重点执行部分,调用func,上下文来看此处就是
  7. // 对syncProxyRules()的调用
  8. bfr.lastRun = bfr.timer.Now() // 记录运行时间
  9. bfr.timer.Stop()
  10. bfr.timer.Reset(bfr.maxInterval) // 重设下次运行时间
  11. klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
  12. return
  13. }
  14. //限制条件不允许运行,计算下次运行时间
  15. elapsed := bfr.timer.Since(bfr.lastRun) // elapsed:上次运行时间到现在已过多久
  16. nextPossible := bfr.minInterval - elapsed // nextPossible:下次运行至少差多久(最小周期)
  17. nextScheduled := bfr.maxInterval - elapsed // nextScheduled:下次运行最迟差多久(最大周期)
  18. klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
  19. if nextPossible < nextScheduled {
  20. bfr.timer.Stop()
  21. bfr.timer.Reset(nextPossible)
  22. klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
  23. }
  24. }

SyncProxyRules 同步 Proxy 规则

syncProxyRules()为proxier的核心逻辑,类似于iptables proxier实现了对apiserver同步的service、endpoints信息的同步与监听,同时在其生成初始和变化时同步ipvs规则(iptables、ipvs虚拟主机、ipset集规则),最终实现kubernetes的”service”机制。

syncProxyRules()代码部分过长,下面将分开对重点部分一一进行分析。

ipvs-mode proxier的同步ipvs规则主要完成以下几个主要步骤操作:

  • 同步与新更service和endpoints;
  • 初始化链和ipset集;
  • 每个服务构建ipvs规则(iptables/ipvs/ipset),服务类型不同生成的规则也相应不同;
  • 清理过旧规则及信息 。

更新 service 与 endpoint变化信息

ipvs-mode proxier的service和endpoint变化更新的机制与iptables-mode的完全一致,详细可以参考iptables-mode的”syncProxyRule 同步配置与规则”内的相关内容,这里就不再详细赘述。

Proxier类对象有两个属性:serviceChangesendpointsChanges是就是用来跟踪Service和Endpoint的更新信息,以及两个Tracker及方法:ServiceChangeTracker服务信息变更Tracker,EndpointChangeTracker 端点信息变更Tracker,实时监听apiserver的变更事件。

UpdateServiceMap() svc 服务的更新实现,将serviceChanges的服务项与proxier serviceMap进行更新(合并、删除废弃项)返回,UpdateEndpointsMap() 端点更新的实现,将endpointsChanges的端点项与proxier endpointMap进行更新(合并、删除废弃项)并返回已更新信息。

!FILENAME pkg/proxy/ipvs/proxier.go:730

  1. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
  2. endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)

创建 kube 顶层链和连接信息

!FILENAME pkg/proxy/ipvs/proxier.go:748

  1. proxier.natChains.Reset() //nat链
  2. proxier.natRules.Reset() //nat规则
  3. proxier.filterChains.Reset() //filter链
  4. proxier.filterRules.Reset() //filter规则
  5. //写表头
  6. writeLine(proxier.filterChains, "*filter")
  7. writeLine(proxier.natChains, "*nat")
  8. proxier.createAndLinkeKubeChain() //创建kubernetes的表连接链数据

!FILENAME pkg/proxy/ipvs/proxier.go:1418

  1. func (proxier *Proxier) createAndLinkeKubeChain() {
  2. //通过iptables-save获取现有的filter和NAT表存在的链数据
  3. existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
  4. existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
  5. // 顶层链数据的构建
  6. // NAT表链: KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL
  7. // KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ
  8. // Filter表链: KUBE-FORWARD
  9. for _, ch := range iptablesChains {
  10. //不存在则创建链,创建顶层链
  11. if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
  12. klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
  13. return
  14. }
  15. //nat表写链
  16. if ch.table == utiliptables.TableNAT {
  17. if chain, ok := existingNATChains[ch.chain]; ok {
  18. writeBytesLine(proxier.natChains, chain) //现存在的链
  19. } else {
  20. // "KUBE-POSTROUTING"
  21. writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
  22. }
  23. } else { // filter表写链
  24. if chain, ok := existingFilterChains[KubeForwardChain]; ok {
  25. writeBytesLine(proxier.filterChains, chain) //现存在的链
  26. } else {
  27. // "KUBE-FORWARD"
  28. writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
  29. }
  30. }
  31. }
  32. // 默认链下创建kubernete服务专用跳转规则
  33. // iptables -I OUTPUT -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
  34. // iptables -I PREROUTING -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
  35. // iptables -I POSTROUTING -t nat --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
  36. // iptables -I FORWARD -t filter --comment "kubernetes forwarding rules" -j KUBE-FORWARD
  37. for _, jc := range iptablesJumpChain {
  38. args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
  39. if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
  40. klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
  41. }
  42. }
  43. // 写kubernetes专用的POSTROUTING nat规则
  44. // -A KUBE-POSTROUTING -m comment --comment "..." -m mark --mark $masqueradeMark -j MASQUERADE
  45. writeLine(proxier.natRules, []string{
  46. "-A", string(kubePostroutingChain),
  47. "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
  48. "-m", "mark", "--mark", proxier.masqueradeMark,
  49. "-j", "MASQUERADE",
  50. }...)
  51. // 写kubernetes专用的masquerade伪装地址标记规则
  52. // -A KUBE-MARK-MASQ -j MARK --set-xmark $masqueradeMark
  53. writeLine(proxier.natRules, []string{
  54. "-A", string(KubeMarkMasqChain),
  55. "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
  56. }...)
  57. }

Dummy 接口和 ipset 默认集创建

!FILENAME pkg/proxy/ipvs/proxier.go:760

  1. //为服务地址的绑定,确保已创建虚拟接口kube-ipvs0
  2. _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  3. if err != nil {
  4. klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
  5. return
  6. }
  7. // 确保kubernetes专用的ipset集已创建
  8. for _, set := range proxier.ipsetList {
  9. if err := ensureIPSet(set); err != nil {
  10. return
  11. }
  12. set.resetEntries()
  13. }

proxier.ipsetList的定义信息,在proxier对象创建时初始化了ipsetList列表

!FILENAME pkg/proxy/ipvs/proxier.go:113

  1. var ipsetInfo = []struct {
  2. name string //ipset set名称
  3. setType utilipset.Type //set类型{HashIPPortIP|HashIPPort|HashIPPortNet|BitmapPort}
  4. comment string //comment描述信息
  5. }{
  6. {kubeLoopBackIPSet, utilipset.HashIPPortIP, kubeLoopBackIPSetComment},
  7. //...
  8. }
ipset集名称 类型 描述
KUBE-LOOP-BACK hash:ip,port,ip Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose
KUBE-CLUSTER-IP hash:ip,port Kubernetes service cluster ip + port for masquerade purpose
KUBE-EXTERNAL-IP hash:ip,port Kubernetes service external ip + port for masquerade and filter purpose
KUBE-LOAD-BALANCER hash:ip,port Kubernetes service lb portal
KUBE-LOAD-BALANCER-FW hash:ip,port Kubernetes service load balancer ip + port for load balancer with sourceRange
KUBE-LOAD-BALANCER-LOCAL hash:ip,port Kubernetes service load balancer ip + port with externalTrafficPolicy=local
KUBE-LOAD-BALANCER-SOURCE-IP hash:ip,port,ip Kubernetes service load balancer ip + port + source IP for packet filter purpose
KUBE-LOAD-BALANCER-SOURCE-CIDR hash:ip,port,net Kubernetes service load balancer ip + port + source cidr for packet filter purpose
KUBE-NODE-PORT-TCP BitmapPort Kubernetes nodeport TCP port for masquerade purpose
KUBE-NODE-PORT-LOCAL-TCP BitmapPort BitmapPort,Kubernetes nodeport TCP port with externalTrafficPolicy=local
KUBE-NODE-PORT-UDP BitmapPort Kubernetes nodeport UDP port for masquerade purpose
KUBE-NODE-PORT-LOCAL-UDP BitmapPort Kubernetes nodeport UDP port with externalTrafficPolicy=local
KUBE-NODE-PORT-SCTP BitmapPort Kubernetes nodeport SCTP port for masquerade purpose
KUBE-NODE-PORT-LOCAL-SCTP BitmapPort Kubernetes nodeport SCTP port with externalTrafficPolicy=local

每个服务生成 ipvs 规则

代码逻辑包含在一个for循环内,对serviceMap内的每个服务进行遍历处理,对不同的服务类型(clusterip/nodePort/externalIPs/load-balancer)进行不同的处理(ipset集/ipvs虚拟主机/ipvs后端服务器)。

ipvs模式,通过svc创建的集群都绑定在默认dummy(kube-ipvs0)虚拟网卡,创建ipvs集群IP执行以下几项操作:

  • 节点中存在虚拟接口为 kube-ipvs0,且服务 IP 地址绑定到虚拟接口
  • 分别为每个kube服务 IP 地址创建 IPVS 虚拟服务器
  • 为每个 IPVS 虚拟服务器创建RealServers (kube服务 endpoints)

!FILENAME pkg/proxy/ipvs/proxier.go:784

  1. for svcName, svc := range proxier.serviceMap {
  2. //...... 后面详细分析
  3. }

基于此服务的有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以备后面生成相应iptables规则(SNAT伪装地址)。

!FILENAME pkg/proxy/ipvs/proxier.go:796

  1. for _, e := range proxier.endpointsMap[svcName] {
  2. ep, ok := e.(*proxy.BaseEndpointInfo)
  3. if !ok {
  4. klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
  5. continue
  6. }
  7. if !ep.IsLocal { //非本地
  8. continue
  9. }
  10. epIP := ep.IP() //端点IP
  11. epPort, err := ep.Port() //端点Port
  12. if epIP == "" || err != nil { //有效IP和端口正常
  13. continue
  14. }
  15. // 构造ipset集的entry记录项
  16. entry := &utilipset.Entry{
  17. IP: epIP,
  18. Port: epPort,
  19. Protocol: protocol,
  20. IP2: epIP,
  21. SetType: utilipset.HashIPPortIP,
  22. }
  23. // 类型校验KUBE-LOOP-BACK集合entry记录项
  24. if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
  25. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
  26. continue
  27. }
  28. // 插入此entry记录至active记录队列
  29. proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
  30. }

clusterIP服务类型流量的承接(clusterIP为默认方式,仅资源集群内可访问),ipset集KUBE-CLUSTER-IP更新,以备后面生成相应iptables规则。

!FILENAME pkg/proxy/ipvs/proxier.go:827

  1. //构建ipset entry
  2. entry := &utilipset.Entry{
  3. IP: svcInfo.ClusterIP.String(),
  4. Port: svcInfo.Port,
  5. Protocol: protocol,
  6. SetType: utilipset.HashIPPort,
  7. }
  8. // 类型校验ipset entry
  9. if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
  10. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
  11. continue
  12. }
  13. // 名为KUBE-CLUSTER-IP的ipset集插入entry,以备后面统一生成IPtables规则
  14. proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
  15. // 构建ipvs虚拟服务器VS服务对象
  16. serv := &utilipvs.VirtualServer{
  17. Address: svcInfo.ClusterIP,
  18. Port: uint16(svcInfo.Port),
  19. Protocol: string(svcInfo.Protocol),
  20. Scheduler: proxier.ipvsScheduler,
  21. }
  22. // 设置IPVS服务的会话保持标志和超时时间
  23. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  24. serv.Flags |= utilipvs.FlagPersistent
  25. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  26. }
  27. // 将clusterIP绑定至dummy虚拟接口上,syncService()处理中需置bindAddr地址为True。
  28. // ipvs为服务创建VS(虚拟主机)
  29. if err := proxier.syncService(svcNameString, serv, true); err == nil {
  30. activeIPVSServices[serv.String()] = true
  31. activeBindAddrs[serv.Address.String()] = true
  32. // 为虚拟主机/服务(vip)同步endpoints信息。
  33. // IPVS为VS更新RS(realServer后端服务器)
  34. if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
  35. klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
  36. }
  37. } else {
  38. klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
  39. }

syncService() 更新和同步ipvs服务信息及服务IP与dummy接口的绑定

!FILENAME pkg/proxy/ipvs/proxier.go:1498

  1. func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
  2. //获取IPVS虚拟主机服务信息
  3. appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  4. //无此虚拟主机服务或此服务信息变更
  5. if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
  6. if appliedVirtualServer == nil {
  7. // 服务未找到,则创建新的服务
  8. klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
  9. if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
  10. klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
  11. return err
  12. }
  13. } else {
  14. // 服务信息改变,则更新存在服务信息,在更新期间服务VIP不会关闭
  15. klog.V(3).Infof("IPVS service %s was changed", svcName)
  16. if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
  17. klog.Errorf("Failed to update IPVS service, err:%v", err)
  18. return err
  19. }
  20. }
  21. // 将服务IP绑定到dummy接口上
  22. if bindAddr {
  23. klog.V(4).Infof("Bind addr %s", vs.Address.String())
  24. _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) //netlinkHandle处理的实现在文章最后的netlink工具介绍部分详细说明
  25. if err != nil {
  26. klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
  27. return err
  28. }
  29. }
  30. return nil
  31. }

syncEndpoint() 为虚拟主机/服务(clusterip)同步endpoints信息,实现ipvs为VS更新RS(realServer后端服务器)。

!FILENAME pkg/proxy/ipvs/proxier.go:1532

  1. func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
  2. appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
  3. if err != nil || appliedVirtualServer == nil {
  4. klog.Errorf("Failed to get IPVS service, error: %v", err)
  5. return err
  6. }
  7. // curEndpoints表示当前系统IPVS目标列表
  8. curEndpoints := sets.NewString()
  9. // newEndpoints表示从apiServer监听到的Endpoints
  10. newEndpoints := sets.NewString()
  11. // 依据虚拟服务器获取RS(realservers)列表
  12. curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
  13. if err != nil {
  14. klog.Errorf("Failed to list IPVS destinations, error: %v", err)
  15. return err
  16. }
  17. for _, des := range curDests {
  18. curEndpoints.Insert(des.String()) // 写入curEndpoints
  19. }
  20. //迭代endpointsMaps信息,将非本地的enpoints写入newEndpoints
  21. for _, epInfo := range proxier.endpointsMap[svcPortName] {
  22. if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
  23. continue
  24. }
  25. newEndpoints.Insert(epInfo.String())
  26. }
  27. // 创建新的endpoints
  28. for _, ep := range newEndpoints.List() {
  29. ip, port, err := net.SplitHostPort(ep)
  30. if err != nil {
  31. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  32. continue
  33. }
  34. portNum, err := strconv.Atoi(port)
  35. if err != nil {
  36. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  37. continue
  38. }
  39. newDest := &utilipvs.RealServer{
  40. Address: net.ParseIP(ip),
  41. Port: uint16(portNum),
  42. Weight: 1,
  43. }
  44. //判断当前系统ipvs列表是否存在
  45. if curEndpoints.Has(ep) {
  46. //检测是否在gracefulDelete列表,如果是则此处立即删除
  47. uniqueRS := GetUniqueRSName(vs, newDest)
  48. if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  49. continue
  50. }
  51. klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
  52. err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
  53. if err != nil {
  54. klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
  55. continue
  56. }
  57. }
  58. // 不存在则新增RealServer项(对应目标endpoint)
  59. err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
  60. if err != nil {
  61. klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
  62. continue
  63. }
  64. }
  65. // 删除过旧的endpoints
  66. for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
  67. // 如果curEndpoint在gracefulDelete内,跳过
  68. uniqueRS := vs.String() + "/" + ep
  69. if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
  70. continue
  71. }
  72. ip, port, err := net.SplitHostPort(ep)
  73. if err != nil {
  74. klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
  75. continue
  76. }
  77. portNum, err := strconv.Atoi(port)
  78. if err != nil {
  79. klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
  80. continue
  81. }
  82. delDest := &utilipvs.RealServer{
  83. Address: net.ParseIP(ip),
  84. Port: uint16(portNum),
  85. }
  86. klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
  87. // 删除RS
  88. err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
  89. if err != nil {
  90. klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
  91. continue
  92. }
  93. }
  94. return nil
  95. }

externalIPs服务类型流量的承接,服务是否启用ExternalIPs,在指定的Node上开启监听端口(代码逻辑判断是否为本地ip),而非像nodeport所有节点监听。ipset集KUBE-EXTERNAL-IP更新,以备后面生成相应iptables规则。

!FILENAME pkg/proxy/ipvs/proxier.go:866

  1. for _, externalIP := range svcInfo.ExternalIPs {
  2. if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
  3. klog.Errorf("can't determine if IP is local, assuming not: %v", err)
  4. // 如果指定的externealIP为本地地址且协议不为SCTP
  5. } else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
  6. lp := utilproxy.LocalPort{
  7. Description: "externalIP for " + svcNameString,
  8. IP: externalIP,
  9. Port: svcInfo.Port,
  10. Protocol: protocol,
  11. }
  12. if proxier.portsMap[lp] != nil { //端口已存在
  13. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  14. replacementPortsMap[lp] = proxier.portsMap[lp]
  15. } else {
  16. socket, err := proxier.portMapper.OpenLocalPort(&lp) //打开本地端口socket
  17. if err != nil {
  18. msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)
  19. proxier.recorder.Eventf( //通知事件
  20. &v1.ObjectReference{
  21. Kind: "Node",
  22. Name: proxier.hostname,
  23. UID: types.UID(proxier.hostname),
  24. Namespace: "",
  25. }, v1.EventTypeWarning, err.Error(), msg)
  26. klog.Error(msg)
  27. continue
  28. }
  29. replacementPortsMap[lp] = socket //存放端口信息
  30. }
  31. }
  32. // 创建ipset entry
  33. entry := &utilipset.Entry{
  34. IP: externalIP,
  35. Port: svcInfo.Port,
  36. Protocol: protocol,
  37. SetType: utilipset.HashIPPort,
  38. }
  39. // We have to SNAT packets to external IPs.
  40. if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
  41. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
  42. continue
  43. }
  44. // 名为KUBE-EXTERNAL-IP的ipset集插入entry,以备后面统一生成IPtables规则
  45. proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
  46. // 为服务定义ipvs虚拟主机信息
  47. serv := &utilipvs.VirtualServer{
  48. Address: net.ParseIP(externalIP),
  49. Port: uint16(svcInfo.Port),
  50. Protocol: string(svcInfo.Protocol),
  51. Scheduler: proxier.ipvsScheduler,
  52. }
  53. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  54. serv.Flags |= utilipvs.FlagPersistent
  55. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  56. }
  57. // 将clusterIP绑定至dummy虚拟接口上,syncService()处理中需置bindAddr地址为True。
  58. // ipvs为服务创建VS(虚拟主机)
  59. // 为虚拟主机/服务同步endpoints信息。
  60. // IPVS为VS更新RS(realServer后端服务器)
  61. //...(同clusterip)
  62. }

load-balancer服务类型流量的承接,服务的LoadBalancerSourceRanges和externalTrafficPolicy=local被指定时将对KUBE-LOAD-BALANCER-LOCAL、KUBE-LOAD-BALANCER-FW、KUBE-LOAD-BALANCER-SOURCE-CIDR、KUBE-LOAD-BALANCER-SOURCE-IP ipset集更新,以备后面生成相应iptables规则。

!FILENAME pkg/proxy/ipvs/proxier.go:937

  1. for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
  2. if ingress.IP != "" {
  3. // 构建ipset entry
  4. entry = &utilipset.Entry{
  5. IP: ingress.IP,
  6. Port: svcInfo.Port,
  7. Protocol: protocol,
  8. SetType: utilipset.HashIPPort,
  9. }
  10. // 增加SLB(service load balancer)ingressIP:Port与kube服务IP集对应
  11. // KUBE-LOAD-BALANCER ipset集更新
  12. if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
  13. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
  14. continue
  15. }
  16. proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
  17. // 服务指定externalTrafficPolicy=local时,KUBE-LOAD-BALANCER-LOCAL ipset集更新
  18. if svcInfo.OnlyNodeLocalEndpoints {
  19. if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
  20. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
  21. continue
  22. }
  23. proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
  24. }
  25. // 服务的LoadBalancerSourceRanges被指定时,基于源IP保护的防火墙策略开启,KUBE-LOAD-BALANCER-FW ipset集更新
  26. if len(svcInfo.LoadBalancerSourceRanges) != 0 {
  27. if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
  28. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
  29. continue
  30. }
  31. proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
  32. allowFromNode := false
  33. for _, src := range svcInfo.LoadBalancerSourceRanges {
  34. // 构建ipset entry
  35. entry = &utilipset.Entry{
  36. IP: ingress.IP,
  37. Port: svcInfo.Port,
  38. Protocol: protocol,
  39. Net: src,
  40. SetType: utilipset.HashIPPortNet,
  41. }
  42. // 枚举所有源CIDR白名单列表,KUBE-LOAD-BALANCER-SOURCE-CIDR ipset集更新
  43. if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
  44. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
  45. continue
  46. }
  47. proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
  48. // ignore error because it has been validated
  49. _, cidr, _ := net.ParseCIDR(src)
  50. if cidr.Contains(proxier.nodeIP) {
  51. allowFromNode = true
  52. }
  53. }
  54. // 允许来自Node流量(LB对应后端hosts之间交互)
  55. if allowFromNode {
  56. entry = &utilipset.Entry{
  57. IP: ingress.IP,
  58. Port: svcInfo.Port,
  59. Protocol: protocol,
  60. IP2: ingress.IP,
  61. SetType: utilipset.HashIPPortIP,
  62. }
  63. // 枚举所有白名单源IP列表,KUBE-LOAD-BALANCER-SOURCE-IP ipset集更新
  64. if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
  65. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name))
  66. continue
  67. }
  68. proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
  69. }
  70. }
  71. // 构建ipvs 虚拟主机对象
  72. serv := &utilipvs.VirtualServer{
  73. Address: net.ParseIP(ingress.IP), // SLB ip
  74. Port: uint16(svcInfo.Port), // SLB 端口
  75. Protocol: string(svcInfo.Protocol), // 协议
  76. Scheduler: proxier.ipvsScheduler, // RR
  77. }
  78. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  79. serv.Flags |= utilipvs.FlagPersistent
  80. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  81. }
  82. // ipvs为服务创建VS(虚拟主机),LB ingressIP绑定dummy接口
  83. // ipvs为VS更新RS(realServer后端服务器)
  84. //...(同clusterip)
  85. }
  86. }

NodePort服务类型流量的承接,服务将在每个节点上都将开启指定的nodeport端口,并更新相应的ipset集。

!FILENAME pkg/proxy/ipvs/proxier.go:1040

  1. if svcInfo.NodePort != 0 {
  2. addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) // 获取node addresses
  3. if err != nil {
  4. klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
  5. continue
  6. }
  7. var lps []utilproxy.LocalPort
  8. for address := range addresses {
  9. lp := utilproxy.LocalPort{
  10. Description: "nodePort for " + svcNameString,
  11. IP: address,
  12. Port: svcInfo.NodePort,
  13. Protocol: protocol,
  14. }
  15. if utilproxy.IsZeroCIDR(address) {
  16. // Empty IP address means all
  17. lp.IP = ""
  18. lps = append(lps, lp)
  19. break
  20. }
  21. lps = append(lps, lp) //整理与格式化后的lps列表
  22. }
  23. // 为node节点的IPs打开端口并保存持有socket句柄
  24. for _, lp := range lps {
  25. if proxier.portsMap[lp] != nil {
  26. klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
  27. replacementPortsMap[lp] = proxier.portsMap[lp]
  28. } else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
  29. // 打开和监听端口(非SCTP协议)
  30. socket, err := proxier.portMapper.OpenLocalPort(&lp)
  31. if err != nil {
  32. klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
  33. continue
  34. }
  35. if lp.Protocol == "udp" {
  36. // UDP协议,清理udp conntrack记录
  37. isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
  38. conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
  39. }
  40. replacementPortsMap[lp] = socket
  41. } //socket保存
  42. }
  43. // Nodeports无论是否为本地都需要SNAT
  44. // 构建ipset entry
  45. entry = &utilipset.Entry{
  46. // No need to provide ip info
  47. Port: svcInfo.NodePort,
  48. Protocol: protocol,
  49. SetType: utilipset.BitmapPort,
  50. }
  51. var nodePortSet *IPSet
  52. //基于协议类型选择ipset集
  53. switch protocol {
  54. case "tcp": // KUBE-NODE-PORT-TCP
  55. nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
  56. case "udp": // KUBE-NODE-PORT-UDP
  57. nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
  58. case "sctp": // KUBE-NODE-PORT-SCTP
  59. nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
  60. default:
  61. klog.Errorf("Unsupported protocol type: %s", protocol)
  62. }
  63. if nodePortSet != nil {
  64. if valid := nodePortSet.validateEntry(entry); !valid {
  65. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
  66. continue
  67. }
  68. // 更新ipset集
  69. nodePortSet.activeEntries.Insert(entry.String())
  70. }
  71. // 服务externaltrafficpolicy=local指定时,基于协议类型更新ipset集entry
  72. if svcInfo.OnlyNodeLocalEndpoints {
  73. var nodePortLocalSet *IPSet
  74. switch protocol {
  75. case "tcp": //KUBE-NODE-PORT-LOCAL-TCP
  76. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
  77. case "udp": //KUBE-NODE-PORT-LOCAL-UDP
  78. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
  79. case "sctp": //KUBE-NODE-PORT-LOCAL-SCTP
  80. nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
  81. default:
  82. klog.Errorf("Unsupported protocol type: %s", protocol)
  83. }
  84. if nodePortLocalSet != nil {
  85. if valid := nodePortLocalSet.validateEntry(entry); !valid {
  86. klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
  87. continue
  88. }
  89. // 更新ipset集
  90. nodePortLocalSet.activeEntries.Insert(entry.String())
  91. }
  92. }
  93. // 为Node每个ip address创建ipvs路由项(VS/RS)
  94. var nodeIPs []net.IP
  95. for address := range addresses {
  96. if !utilproxy.IsZeroCIDR(address) {
  97. nodeIPs = append(nodeIPs, net.ParseIP(address))
  98. continue
  99. }
  100. // zero cidr
  101. nodeIPs, err = proxier.ipGetter.NodeIPs()
  102. if err != nil {
  103. klog.Errorf("Failed to list all node IPs from host, err: %v", err)
  104. }
  105. }
  106. for _, nodeIP := range nodeIPs {
  107. // 构建ipvs VS对象
  108. serv := &utilipvs.VirtualServer{
  109. Address: nodeIP, //node ip地址
  110. Port: uint16(svcInfo.NodePort), //node端口
  111. Protocol: string(svcInfo.Protocol), //协议
  112. Scheduler: proxier.ipvsScheduler, //RR
  113. }
  114. if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
  115. serv.Flags |= utilipvs.FlagPersistent
  116. serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
  117. }
  118. // 这里不需要将Node IP绑定到dummy接口,参数值为false
  119. // ipvs为服务创建VS(虚拟主机)
  120. // ipvs为VS更新RS(realServer后端服务器)
  121. //...(同clusterip)
  122. }

SyncIPSetEntries 同步 ipset 记录

!FILENAME pkg/proxy/ipvs/proxier.go:1176

  1. for _, set := range proxier.ipsetList {
  2. set.syncIPSetEntries()
  3. }

!FILENAME pkg/proxy/ipvs/ipset.go:125

  1. func (set *IPSet) syncIPSetEntries() {
  2. appliedEntries, err := set.handle.ListEntries(set.Name)
  3. if err != nil {
  4. klog.Errorf("Failed to list ip set entries, error: %v", err)
  5. return
  6. }
  7. // currentIPSetEntries代表从apiServer上一直监听着的endpoints列表
  8. currentIPSetEntries := sets.NewString()
  9. for _, appliedEntry := range appliedEntries {
  10. currentIPSetEntries.Insert(appliedEntry)
  11. }
  12. // 求差集
  13. // s1 = {a1, a2, a3}
  14. // s2 = {a1, a2, a4, a5}
  15. // s1.Difference(s2) = {a3}
  16. // s2.Difference(s1) = {a4,a5}
  17. if !set.activeEntries.Equal(currentIPSetEntries) {
  18. // 清理过旧记录(取currentIPSetEntries在activeEntries中没有的entries)
  19. for _, entry := range currentIPSetEntries.Difference(set.activeEntries).List() {
  20. if err := set.handle.DelEntry(entry, set.Name); err != nil {
  21. if !utilipset.IsNotFoundError(err) {
  22. klog.Errorf("Failed to delete ip set entry: %s from ip set: %s, error: %v", entry, set.Name, err)
  23. }
  24. } else {
  25. klog.V(3).Infof("Successfully delete legacy ip set entry: %s from ip set: %s", entry, set.Name)
  26. }
  27. }
  28. // 新增记录(取activeEntries在currentIPSetEntries中没有的entries)
  29. for _, entry := range set.activeEntries.Difference(currentIPSetEntries).List() {
  30. if err := set.handle.AddEntry(entry, &set.IPSet, true); err != nil {
  31. klog.Errorf("Failed to add entry: %v to ip set: %s, error: %v", entry, set.Name, err)
  32. } else {
  33. klog.V(3).Infof("Successfully add entry: %v to ip set: %s", entry, set.Name)
  34. }
  35. }
  36. }
  37. }

创建 iptables 规则数据

!FILENAME pkg/proxy/ipvs/proxier.go:1182

  1. proxier.writeIptablesRules()

基于ipset定义创建iptables NAT表的kubernetes初始固定链规则数据。

!FILENAME pkg/proxy/ipvs/proxier.go:1269

  1. for _, set := range ipsetWithIptablesChain {
  2. if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() {
  3. args = append(args[:0], "-A", set.from)
  4. if set.protocolMatch != "" {
  5. args = append(args, "-p", set.protocolMatch)
  6. }
  7. args = append(args,
  8. "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(),
  9. "-m", "set", "--match-set", set.name,
  10. set.matchType,
  11. )
  12. // -A $setFrom -p $prot -m comment --comment $commentStr
  13. // -m set --match-set $setName $setType -j $setTo
  14. writeLine(proxier.natRules, append(args, "-j", set.to)...)
  15. }
  16. }

依据ipsetWithIptablesChain定义生成以下创建固定链规则数据

KUBE-POSTROUTING匹配KUBE-LOOP-BACK ipset表则伪装地址

-A KUBE-POSTROUTING -m comment —comment “Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose” -m set —match-set KUBE-LOOP-BACK dst,dst,src -j MASQUERADE

LoadBalancer服务类型相关规则

-A KUBE-SERVICES -m comment —comment “Kubernetes service lb portal” -m set —match-set KUBE-LOAD-BALANCER dst,dst -j KUBE-LOAD-BALANCER

-A KUBE-LOAD-BALANCER -m comment —comment “Kubernetes service load balancer ip + port for load balancer with sourceRange” -m set —match-set KUBE-LOAD-BALANCER-FW dst,dst -j KUBE-FIREWALL-A KUBE-FIREWALL -m comment —comment “Kubernetes service load balancer ip + port + source cidr for packet filter” -m set —match-set KUBE-LOAD-BALANCER-SOURCE-CIDR dst,dst,src -j RETURN-A KUBE-FIREWALL -m comment —comment “Kubernetes service load balancer ip + port + source IP for packet filter purpose” -m set —match-set KUBE-LOAD-BALANCER-SOURCE-IP dst,dst,src -j RETURN-A KUBE-LOAD-BALANCER -m comment —comment “Kubernetes service load balancer ip + port with externalTrafficPolicy=local” -m set —match-set KUBE-LOAD-BALANCER-LOCAL dst,dst -j RETURN

Nodeport服务类型相关规则

-A KUBE-NODE-PORT -p tcp -m comment —comment “Kubernetes service load balancer ip + port with externalTrafficPolicy=local” -m set —match-set KUBE-NODE-PORT-LOCAL-TCP dst -j RETURN-A KUBE-NODE-PORT -p tcp -m comment —comment “Kubernetes nodeport TCP port for masquerade purpose” -m set —match-set KUBE-NODE-PORT-TCP dst -j KUBE-MARK-MASQ-A KUBE-NODE-PORT -p udp -m comment —comment “Kubernetes nodeport UDP port with externalTrafficPolicy=local” -m set —match-set KUBE-NODE-PORT-LOCAL-UDP dst -j RETURN-A KUBE-NODE-PORT -p udp -m comment —comment “Kubernetes nodeport UDP port for masquerade purpose” -m set —match-set KUBE-NODE-PORT-UDP dst -j KUBE-MARK-MASQ-A KUBE-SERVICES -p sctp -m comment —comment “Kubernetes nodeport SCTP port for masquerade purpose” -m set —match-set KUBE-NODE-PORT-SCTP dst -j KUBE-NODE-PORT-A KUBE-NODE-PORT -p sctp -m comment —comment “Kubernetes nodeport SCTP port with externalTrafficPolicy=local” -m set —match-set KUBE-NODE-PORT-LOCAL-SCTP dst -j RETURN

kube-proxy启动参数”—masquerade-all=true”, 针对类型为clusterip服务生成相应的NAT表KUBE-SERVICES链规则数据,masquerade-all实现访问service ip流量伪装。

!FILENAME pkg/proxy/ipvs/proxier.go:1284

  1. //ipset名称为"KUBE-CLUSTER-IP"不为空,即clusterip类型服务
  2. if !proxier.ipsetList[kubeClusterIPSet].isEmpty() {
  3. args = append(args[:0],
  4. "-A", string(kubeServicesChain),
  5. "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(),
  6. "-m", "set", "--match-set", kubeClusterIPSet,
  7. )
  8. //当proxy配置为masqueradeAll=true
  9. if proxier.masqueradeAll {
  10. //nat表:-A KUBE-SERVICES -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
  11. writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...)
  12. } else if len(proxier.clusterCIDR) > 0 {
  13. //当指定了clusterCIDR,针对非集群到服务VIP的流量masquerades规则 (dst,dst 目标ip:目标端口)
  14. // nat表:-A KUBE-SERVICES -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst ! -s $clusterCIDR -j KUBE-MARK-MASQ
  15. writeLine(proxier.natRules, append(args, "dst,dst", "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
  16. } else {
  17. // 所有来自服务VIP出流量masquerades规则 (src,dst 源ip:目标端口)
  18. // 如:VIP:<random port> to VIP:<service port>
  19. // nat表:-A KUBE-SERVICES -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP src,dst -j KUBE-MARK-MASQ
  20. writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...)
  21. }
  22. }

为服务externalIPs专用ipset集(存在配置externalIPs的服务)生成相应的iptables NAT表规则数据。

!FILENAME pkg/proxy/ipvs/proxier.go:1311

  1. if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
  2. // 为external IPs添加masquerade规则
  3. args = append(args[:0],
  4. "-A", string(kubeServicesChain),
  5. "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(),
  6. "-m", "set", "--match-set", kubeExternalIPSet,
  7. "dst,dst",
  8. )
  9. // -A KUBE-SERVICES -m comment --comment "Kubernetes service external ip + port for masquerade and filter purpose" -m set --match-set KUBE-EXTERNAL-IP dst,dst -j KUBE-MARK-MASQ
  10. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
  11. // 允许external ips流量,而非来自本地网桥流量(如来自一个容器流量或本地处理的forward至服务流量)
  12. externalTrafficOnlyArgs := append(args,
  13. "-m", "physdev", "!", "--physdev-is-in",
  14. "-m", "addrtype", "!", "--src-type", "LOCAL")
  15. // -m set match-set KUBE-EXTERNAL-IP dst,dst -m PHYSDEV ! --physdev-is-in -m addrtype ! --src-type LOCAL -j ACCEPT
  16. writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...)
  17. dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
  18. // 识别与允许本地流量
  19. // -m set match-set KUBE-EXTERNAL-IP dst,dst -m addrtype --dst-type LOCAL -j ACCEPT
  20. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
  21. }

acceptIPVSTraffic 在NAT表的KUBE-SERVICE链最后添加对所有目地址为ipvs虚拟服务的流量ACCEPT规则(此规则应放置于KUBE-SERVICE的最底部)。默认服务类型clusterip则生成规则-A KUBE-SERVICE -m set —match-set KUBE-CLUSTER-IP dst,dst -j ACCEPT,如果有服务类型为LoadBalancer则生成规则-A KUBE-SERVICE -m set —match-set KUBE-LOAD-BALANCER dst,dst -j ACCEPT

!FILENAME pkg/proxy/ipvs/proxier.go:1397

  1. proxier.acceptIPVSTraffic()
  2. // -A KUBE-SERVICE -m set --match-set KUBE-CLUSTER-IP dst,dst -j ACCEPT
  3. // -A KUBE-SERVICE -m set --match-set KUBE-LOAD-BALANCER dst,dst -j ACCEPT
  4. func (proxier *Proxier) acceptIPVSTraffic() {
  5. sets := []string{kubeClusterIPSet, kubeLoadBalancerSet}
  6. for _, set := range sets {
  7. var matchType string
  8. if !proxier.ipsetList[set].isEmpty() {
  9. switch proxier.ipsetList[set].SetType {
  10. case utilipset.BitmapPort:
  11. matchType = "dst"
  12. default:
  13. matchType = "dst,dst" //目标ip,目标端口
  14. }
  15. writeLine(proxier.natRules, []string{
  16. "-A", string(kubeServicesChain),
  17. "-m", "set", "--match-set", set, matchType,
  18. "-j", "ACCEPT",
  19. }...)
  20. }
  21. }
  22. }

增加masqueradeMark,允许NodePort流量转发(即使默认FORWARD规则策略不允许)。

!FILENAME pkg/proxy/ipvs/proxier.go:1361

  1. // -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000 -j ACCEPT
  2. writeLine(proxier.filterRules,
  3. "-A", string(KubeForwardChain),
  4. "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
  5. "-m", "mark", "--mark", proxier.masqueradeMark,
  6. "-j", "ACCEPT",
  7. )

clusterCIDR被指定时生成两条filter表KUBE-FORWARD链规则数据,接受源或目标来自一个pod流量。(注:kube-proxy组件配置—cluster-dir参数指定集群中pod使用的网段)

!FILENAME pkg/proxy/ipvs/proxier.go:1369

  1. if len(proxier.clusterCIDR) != 0 {
  2. // 两条规则确保kubernetes forward规则定义的初始包被接受(clusterCIDR所指定的源或目标流量)
  3. // -A KUBE-FORWARD -s $clusterCIDR -m -comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  4. writeLine(proxier.filterRules,
  5. "-A", string(KubeForwardChain),
  6. "-s", proxier.clusterCIDR,
  7. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
  8. "-m", "conntrack",
  9. "--ctstate", "RELATED,ESTABLISHED",
  10. "-j", "ACCEPT",
  11. )
  12. // -A KUBE-FORWARD -m -comment --comment "kubernetes forwarding conntrack pod source rule" -d $clusterCIDR -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
  13. writeLine(proxier.filterRules,
  14. "-A", string(KubeForwardChain),
  15. "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
  16. "-d", proxier.clusterCIDR,
  17. "-m", "conntrack",
  18. "--ctstate", "RELATED,ESTABLISHED",
  19. "-j", "ACCEPT",
  20. )
  21. }

刷新 iptables 规则

!FILENAME pkg/proxy/ipvs/proxier.go:1186

  1. // 合并iptables规则
  2. proxier.iptablesData.Reset()
  3. proxier.iptablesData.Write(proxier.natChains.Bytes())
  4. proxier.iptablesData.Write(proxier.natRules.Bytes())
  5. proxier.iptablesData.Write(proxier.filterChains.Bytes())
  6. proxier.iptablesData.Write(proxier.filterRules.Bytes())
  7. klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
  8. // 基于iptables格式化规则数据,使用iptables-restore刷新iptables规则
  9. err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  10. if err != nil {
  11. klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
  12. // Revert new local ports.
  13. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
  14. return
  15. }

ipvs-mode Proxier整个逻辑实现已分析完,其关键逻辑即syncProxyRules(){…}内代码,其中还有一些细节技术未展开叙述,如几个关键的依赖底层技术ipset的实现runner、ipvs路由(VS/RS)操作基于netlink机制通迅机制的实现等,因篇幅过长,后续再看具体情况补充。

~本文 END~