kube-scheduler源码分析(一)之 NewSchedulerCommand

以下代码分析基于 kubernetes v1.12.0 版本。


  1. kube-scheduler
  2. ├── BUILD
  3. ├── OWNERS
  4. ├── app # app的目录下主要为运行scheduler相关的对象
  5. ├── BUILD
  6. ├── config
  7. ├── BUILD
  8. └── config.go # Scheduler的配置对象config
  9. ├── options # options主要记录 Scheduler 使用到的参数
  10. ├── BUILD
  11. ├── configfile.go
  12. ├── deprecated.go
  13. ├── deprecated_test.go
  14. ├── insecure_serving.go
  15. ├── insecure_serving_test.go
  16. ├── options.go # 主要包括Options、NewOptions、AddFlags、Config等函数
  17. └── options_test.go
  18. └── server.go # 主要包括 NewSchedulerCommand、NewSchedulerConfig、Run等函数
  19. └── scheduler.go # main入口函数

1. Main函数



  1. func main() {
  2. rand.Seed(time.Now().UTC().UnixNano())
  3. command := app.NewSchedulerCommand()
  4. // TODO: once we switch everything over to Cobra commands, we can go back to calling
  5. // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
  6. // normalize func and add the go flag set by hand.
  7. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
  8. pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
  9. // utilflag.InitFlags()
  10. logs.InitLogs()
  11. defer logs.FlushLogs()
  12. if err := command.Execute(); err != nil {
  13. fmt.Fprintf(os.Stderr, "%v\n", err)
  14. os.Exit(1)
  15. }
  16. }


  1. // 初始化scheduler命令结构体
  2. command := app.NewSchedulerCommand()
  3. // 执行Execute
  4. err := command.Execute()

2. NewSchedulerCommand



  1. // NewSchedulerCommand creates a *cobra.Command object with default parameters
  2. func NewSchedulerCommand() *cobra.Command {
  3. opts, err := options.NewOptions()
  4. if err != nil {
  5. glog.Fatalf("unable to initialize command options: %v", err)
  6. }
  7. cmd := &cobra.Command{
  8. Use: "kube-scheduler",
  9. Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
  10. workload-specific function that significantly impacts availability, performance,
  11. and capacity. The scheduler needs to take into account individual and collective
  12. resource requirements, quality of service requirements, hardware/software/policy
  13. constraints, affinity and anti-affinity specifications, data locality, inter-workload
  14. interference, deadlines, and so on. Workload-specific requirements will be exposed
  15. through the API as necessary.`,
  16. Run: func(cmd *cobra.Command, args []string) {
  17. verflag.PrintAndExitIfRequested()
  18. utilflag.PrintFlags(cmd.Flags())
  19. if len(args) != 0 {
  20. fmt.Fprint(os.Stderr, "arguments are not supported\n")
  21. }
  22. if errs := opts.Validate(); len(errs) > 0 {
  23. fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
  24. os.Exit(1)
  25. }
  26. if len(opts.WriteConfigTo) > 0 {
  27. if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
  28. fmt.Fprintf(os.Stderr, "%v\n", err)
  29. os.Exit(1)
  30. }
  31. glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
  32. return
  33. }
  34. c, err := opts.Config()
  35. if err != nil {
  36. fmt.Fprintf(os.Stderr, "%v\n", err)
  37. os.Exit(1)
  38. }
  39. stopCh := make(chan struct{})
  40. if err := Run(c.Complete(), stopCh); err != nil {
  41. fmt.Fprintf(os.Stderr, "%v\n", err)
  42. os.Exit(1)
  43. }
  44. },
  45. }
  46. opts.AddFlags(cmd.Flags())
  47. cmd.MarkFlagFilename("config", "yaml", "yml", "json")
  48. return cmd
  49. }


  1. // 构造option
  2. opts, err := options.NewOptions()
  3. // 初始化config对象
  4. c, err := opts.Config()
  5. // 执行run函数
  6. err := Run(c.Complete(), stopCh)
  7. // 添加参数
  8. opts.AddFlags(cmd.Flags())

2.1. NewOptions


  1. opts, err := options.NewOptions()


  1. // NewOptions returns default scheduler app options.
  2. func NewOptions() (*Options, error) {
  3. cfg, err := newDefaultComponentConfig()
  4. if err != nil {
  5. return nil, err
  6. }
  7. hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
  8. if err != nil {
  9. return nil, err
  10. }
  11. o := &Options{
  12. ComponentConfig: *cfg,
  13. SecureServing: nil, // TODO: enable with apiserveroptions.NewSecureServingOptions()
  14. CombinedInsecureServing: &CombinedInsecureServingOptions{
  15. Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{
  16. BindNetwork: "tcp",
  17. },
  18. Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{
  19. BindNetwork: "tcp",
  20. },
  21. BindPort: hport,
  22. BindAddress: hhost,
  23. },
  24. Authentication: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthenticationOptions()
  25. Authorization: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthorizationOptions()
  26. Deprecated: &DeprecatedOptions{
  27. UseLegacyPolicyConfig: false,
  28. PolicyConfigMapNamespace: metav1.NamespaceSystem,
  29. },
  30. }
  31. return o, nil
  32. }

2.2. Options.Config


  1. c, err := opts.Config()


  • 构建scheduler client、leaderElectionClient、eventClient。
  • 创建event recorder
  • 设置leader选举
  • 创建informer对象,主要函数有NewSharedInformerFactoryNewPodInformer


  1. // Config return a scheduler config object
  2. func (o *Options) Config() (*schedulerappconfig.Config, error) {
  3. c := &schedulerappconfig.Config{}
  4. if err := o.ApplyTo(c); err != nil {
  5. return nil, err
  6. }
  7. // prepare kube clients.
  8. client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
  9. if err != nil {
  10. return nil, err
  11. }
  12. // Prepare event clients.
  13. eventBroadcaster := record.NewBroadcaster()
  14. recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
  15. // Set up leader election if enabled.
  16. var leaderElectionConfig *leaderelection.LeaderElectionConfig
  17. if c.ComponentConfig.LeaderElection.LeaderElect {
  18. leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
  19. if err != nil {
  20. return nil, err
  21. }
  22. }
  23. c.Client = client
  24. c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
  25. c.PodInformer = factory.NewPodInformer(client, 0)
  26. c.EventClient = eventClient
  27. c.Recorder = recorder
  28. c.Broadcaster = eventBroadcaster
  29. c.LeaderElection = leaderElectionConfig
  30. return c, nil
  31. }

2.3. AddFlags


  1. opts.AddFlags(cmd.Flags())


  1. // AddFlags adds flags for the scheduler options.
  2. func (o *Options) AddFlags(fs *pflag.FlagSet) {
  3. fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
  4. fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
  5. fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
  6. o.SecureServing.AddFlags(fs)
  7. o.CombinedInsecureServing.AddFlags(fs)
  8. o.Authentication.AddFlags(fs)
  9. o.Authorization.AddFlags(fs)
  10. o.Deprecated.AddFlags(fs, &o.ComponentConfig)
  11. leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, fs)
  12. utilfeature.DefaultFeatureGate.AddFlag(fs)
  13. }

3. Run


  1. err := Run(c.Complete(), stopCh)



  • 通过scheduler config来创建scheduler的结构体。
  • 运行event broadcaster、healthz server、metrics server。
  • 运行所有的informer并在调度前等待cache的同步(重点)。
  • 执行sched.Run()来运行scheduler的调度逻辑。
  • 如果多个scheduler并开启了LeaderElect,则执行leader选举。


3.1. NewSchedulerConfig


  1. // Build a scheduler config from the provided algorithm source.
  2. schedulerConfig, err := NewSchedulerConfig(c)
  3. if err != nil {
  4. return err
  5. }
  6. // Create the scheduler.
  7. sched := scheduler.NewFromConfig(schedulerConfig)

3.2. InformerFactory.Start


  1. // Start all informers.
  2. go c.PodInformer.Informer().Run(stopCh)
  3. c.InformerFactory.Start(stopCh)

3.3. WaitForCacheSync


  1. // Wait for all caches to sync before scheduling.
  2. c.InformerFactory.WaitForCacheSync(stopCh)
  3. controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)

3.3.1. InformerFactory.WaitForCacheSync


  1. // WaitForCacheSync waits for all started informers' cache were synced.
  2. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
  3. informers := func() map[reflect.Type]cache.SharedIndexInformer {
  4. f.lock.Lock()
  5. defer f.lock.Unlock()
  6. informers := map[reflect.Type]cache.SharedIndexInformer{}
  7. for informerType, informer := range f.informers {
  8. if f.startedInformers[informerType] {
  9. informers[informerType] = informer
  10. }
  11. }
  12. return informers
  13. }()
  14. res := map[reflect.Type]bool{}
  15. for informType, informer := range informers {
  16. res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
  17. }
  18. return res
  19. }


  1. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  2. // if the controller should shutdown
  3. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  4. err := wait.PollUntil(syncedPollPeriod,
  5. func() (bool, error) {
  6. for _, syncFunc := range cacheSyncs {
  7. if !syncFunc() {
  8. return false, nil
  9. }
  10. }
  11. return true, nil
  12. },
  13. stopCh)
  14. if err != nil {
  15. glog.V(2).Infof("stop requested")
  16. return false
  17. }
  18. glog.V(4).Infof("caches populated")
  19. return true
  20. }

3.3.2. controller.WaitForCacheSync


  1. controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced)


  1. // WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
  2. // indicating that the controller identified by controllerName is waiting for syncs, followed by
  3. // either a successful or failed sync.
  4. func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
  5. glog.Infof("Waiting for caches to sync for %s controller", controllerName)
  6. if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
  7. utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
  8. return false
  9. }
  10. glog.Infof("Caches are synced for %s controller", controllerName)
  11. return true
  12. }

3.4. LeaderElection


  1. // If leader election is enabled, run via LeaderElector until done and exit.
  2. if c.LeaderElection != nil {
  3. c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
  4. OnStartedLeading: run,
  5. OnStoppedLeading: func() {
  6. utilruntime.HandleError(fmt.Errorf("lost master"))
  7. },
  8. }
  9. leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
  10. if err != nil {
  11. return fmt.Errorf("couldn't create leader elector: %v", err)
  12. }
  13. leaderElector.Run(ctx)
  14. return fmt.Errorf("lost lease")
  15. }

3.5. Scheduler.Run

  1. // Prepare a reusable run function.
  2. run := func(ctx context.Context) {
  3. sched.Run()
  4. <-ctx.Done()
  5. }
  6. ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
  7. defer cancel()
  8. go func() {
  9. select {
  10. case <-stopCh:
  11. cancel()
  12. case <-ctx.Done():
  13. }
  14. }()
  15. ...
  16. run(ctx)



  1. // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
  2. func (sched *Scheduler) Run() {
  3. if !sched.config.WaitForCacheSync() {
  4. return
  5. }
  6. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
  7. }

