Implementing a controller

The basic logic of our CronJob controller is this:

  • Load the named CronJob

  • List all active jobs, and update the status

  • Clean up old jobs according to the history limits

  • Check if we’re suspended (and don’t do anything else if we are)

  • Get the next scheduled run

  • Run a new job if it’s on schedule, not past the deadline, and notblocked by our concurrency policy

  • Requeue when we either see a running job (done automatically) or it’stime for the next scheduled run.

Apache License

Licensed under the Apache License, Version 2.0 (the “License”);you may not use this file except in compliance with the License.You may obtain a copy of the License at

  1. http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an “AS IS” BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. We’ll start out with some imports. You’ll see below that we’ll need a few more importsthan those scaffolded for us. We’ll talk about each one when we use it.

  1. package controllers
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "github.com/go-logr/logr"
  8. "github.com/robfig/cron"
  9. kbatch "k8s.io/api/batch/v1"
  10. corev1 "k8s.io/api/core/v1"
  11. apierrs "k8s.io/apimachinery/pkg/api/errors"
  12. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  13. "k8s.io/apimachinery/pkg/runtime"
  14. ref "k8s.io/client-go/tools/reference"
  15. ctrl "sigs.k8s.io/controller-runtime"
  16. "sigs.k8s.io/controller-runtime/pkg/client"
  17. batch "tutorial.kubebuilder.io/project/api/v1"
  18. )

Next, we’ll need a few more bits in our Reconciler:

We’ll need the Scheme, in order to call some helpers that set owner references,and we’ll need a Clock, which will allow us to fake timing in our tests.

  1. // CronJobReconciler reconciles a CronJob object
  2. type CronJobReconciler struct {
  3. client.Client
  4. Log logr.Logger
  5. Scheme *runtime.Scheme
  6. Clock
  7. }

ClockWe’ll mock out the clock to make it easier to jump around in time while testing,the “real” clock just calls time.Now.

  1. type realClock struct{}
  2. func (_ realClock) Now() time.Time { return time.Now() }
  3. // clock knows how to get the current time.
  4. // It can be used to fake out timing for testing.
  5. type Clock interface {
  6. Now() time.Time
  7. }

ignoreNotFoundWe generally want to ignore (not requeue) NotFound errors, since we’ll get areconciliation request once the object exists, and requeuing in the meantimewon’t help.

  1. func ignoreNotFound(err error) error {
  2. if apierrs.IsNotFound(err) {
  3. return nil
  4. }
  5. return err
  6. }

Notice that we need a few more RBAC permissions — since we’re creating andmanaging jobs now, we’ll need permissions for those.

  1. // +kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
  2. // +kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/status,verbs=get;update;patch
  3. // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
  4. // +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get

Now, we get to the heart of the controller — the reconciler logic.

  1. var (
  2. scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
  3. )
  4. func (r *CronJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
  5. ctx := context.Background()
  6. log := r.Log.WithValues("cronjob", req.NamespacedName)

1: Load the CronJob by name

We’ll fetch the CronJob using our client. All client methods take acontext (to allow for cancellation) as their first argument, and the objectin question as their last. Get is a bit special, in that it takes aNamespacedNameas the middle argument (most don’t have a middle argument, as we’ll seebelow).

Many client methods also take variadic options at the end.

  1. var cronJob batch.CronJob
  2. if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
  3. log.Error(err, "unable to fetch CronJob")
  4. // we'll ignore not-found errors, since they can't be fixed by an immediate
  5. // requeue (we'll need to wait for a new notification), and we can get them
  6. // on deleted requests.
  7. return ctrl.Result{}, ignoreNotFound(err)
  8. }

2: List all active jobs, and update the status

To fully update our status, we’ll need to list all child jobs in this namespace that belong to this CronJob.Similarly to Get, we can use the List method to list the child jobs. Notice that we use variadic options toset the namespace and field match (which is actually an index lookup that we set up below).

  1. var childJobs kbatch.JobList
  2. if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingField(jobOwnerKey, req.Name)); err != nil {
  3. log.Error(err, "unable to list child Jobs")
  4. return ctrl.Result{}, err
  5. }

Once we have all the jobs we own, we’ll split them into active, successful,and failed jobs, keeping track of the most recent run so that we can record itin status. Remember, status should be able to be reconstituted from the stateof the world, so it’s generally not a good idea to read from the status of theroot object. Instead, you should reconstruct it every run. That’s what we’lldo here.

We can check if a job is “finished” and whether it succeeded or failed using statusconditions. We’ll put that logic in a helper to make our code cleaner.

  1. // find the active list of jobs
  2. var activeJobs []*kbatch.Job
  3. var successfulJobs []*kbatch.Job
  4. var failedJobs []*kbatch.Job
  5. var mostRecentTime *time.Time // find the last run so we can update the status

isJobFinishedWe consider a job “finished” if it has a “succeeded” or “failed” condition marked as true.Status conditions allow us to add extensible status information to our objects that otherhumans and controllers can examine to check things like completion and health.

  1. isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
  2. for _, c := range job.Status.Conditions {
  3. if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
  4. return true, c.Type
  5. }
  6. }
  7. return false, ""
  8. }

getScheduledTimeForJobWe’ll use a helper to extract the scheduled time from the annotation thatwe added during job creation.

  1. getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
  2. timeRaw := job.Annotations[scheduledTimeAnnotation]
  3. if len(timeRaw) == 0 {
  4. return nil, nil
  5. }
  6. timeParsed, err := time.Parse(time.RFC3339, timeRaw)
  7. if err != nil {
  8. return nil, err
  9. }
  10. return &timeParsed, nil
  11. }
  1. for i, job := range childJobs.Items {
  2. _, finishedType := isJobFinished(&job)
  3. switch finishedType {
  4. case "": // ongoing
  5. activeJobs = append(activeJobs, &childJobs.Items[i])
  6. case kbatch.JobFailed:
  7. failedJobs = append(failedJobs, &childJobs.Items[i])
  8. case kbatch.JobComplete:
  9. successfulJobs = append(successfulJobs, &childJobs.Items[i])
  10. }
  11. // We'll store the launch time in an annotation, so we'll reconsitute that from
  12. // the active jobs themselves.
  13. scheduledTimeForJob, err := getScheduledTimeForJob(&job)
  14. if err != nil {
  15. log.Error(err, "unable to parse schedule time for child job", "job", &job)
  16. continue
  17. }
  18. if scheduledTimeForJob != nil {
  19. if mostRecentTime == nil {
  20. mostRecentTime = scheduledTimeForJob
  21. } else if mostRecentTime.Before(*scheduledTimeForJob) {
  22. mostRecentTime = scheduledTimeForJob
  23. }
  24. }
  25. }
  26. if mostRecentTime != nil {
  27. cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
  28. } else {
  29. cronJob.Status.LastScheduleTime = nil
  30. }
  31. cronJob.Status.Active = nil
  32. for _, activeJob := range activeJobs {
  33. jobRef, err := ref.GetReference(r.Scheme, activeJob)
  34. if err != nil {
  35. log.Error(err, "unable to make reference to active job", "job", activeJob)
  36. continue
  37. }
  38. cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
  39. }

Here, we’ll log how many jobs we observed at a slightly higher logging level,for debugging. Notice how instead of using a format string, we use a fixed message,and attach key-value pairs with the extra information. This makes it easier tofilter and query log lines.

  1. log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))

Using the date we’ve gathered, we’ll update the status of our CRD.Just like before, we use our client. To specifically update the statussubresource, we’ll use the Status part of the client, with the Updatemethod.

The status subresource ignores changes to spec, so it’s less likely to conflictwith any other updates, and can have separate permissions.

  1. if err := r.Status().Update(ctx, &cronJob); err != nil {
  2. log.Error(err, "unable to update CronJob status")
  3. return ctrl.Result{}, err
  4. }

Once we’ve updated our status, we can move on to ensuring that the status ofthe world matches what we want in our spec.

3: Clean up old jobs according to the history limit

First, we’ll try to clean up old jobs, so that we don’t leave too many lyingaround.

  1. // NB: deleting these is "best effort" -- if we fail on a particular one,
  2. // we won't requeue just to finish the deleting.
  3. if cronJob.Spec.FailedJobsHistoryLimit != nil {
  4. sort.Slice(failedJobs, func(i, j int) bool {
  5. if failedJobs[i].Status.StartTime == nil {
  6. return failedJobs[j].Status.StartTime != nil
  7. }
  8. return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
  9. })
  10. for i, job := range failedJobs {
  11. if err := r.Delete(ctx, job); err != nil {
  12. log.Error(err, "unable to delete old failed job", "job", job)
  13. }
  14. if int32(i) >= *cronJob.Spec.FailedJobsHistoryLimit {
  15. break
  16. }
  17. }
  18. }
  19. if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
  20. sort.Slice(successfulJobs, func(i, j int) bool {
  21. if successfulJobs[i].Status.StartTime == nil {
  22. return successfulJobs[j].Status.StartTime != nil
  23. }
  24. return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
  25. })
  26. for i, job := range successfulJobs {
  27. if err := r.Delete(ctx, job); err != nil {
  28. log.Error(err, "unable to delete old successful job", "job", job)
  29. }
  30. if int32(i) >= *cronJob.Spec.SuccessfulJobsHistoryLimit {
  31. break
  32. }
  33. }
  34. }

4: Check if we’re suspended

If this object is suspended, we don’t want to run any jobs, so we’ll stop now.This is useful if something’s broken with the job we’re running and we want topause runs to investigate or putz with the cluster, without deleting the object.

  1. if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
  2. log.V(1).Info("cronjob suspended, skipping")
  3. return ctrl.Result{}, nil
  4. }

5: Get the next scheduled run

If we’re not paused, we’ll need to calculate the next scheduled run, and whetheror not we’ve got a run that we haven’t processed yet. getNextScheduleWe’ll calculate the next scheduled time using our helpful cron library.We’ll start calculating appropriate times from our last run, or the creationof the CronJob if we can’t find a last run.

If there are too many missed runs and we don’t have any deadlines set, we’llbail so that we don’t cause issues on controller restarts or wedges.

Otherwise, we’ll just return the missed runs (of which we’ll just use the latest),and the next run, so that we can know when it’s time to reconcile again.

  1. getNextSchedule := func(cronJob *batch.CronJob, now time.Time) (lastMissed *time.Time, next time.Time, err error) {
  2. sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
  3. if err != nil {
  4. return nil, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
  5. }
  6. // for optimization purposes, cheat a bit and start from our last observed run time
  7. // we could reconstitute this here, but there's not much point, since we've
  8. // just updated it.
  9. var earliestTime time.Time
  10. if cronJob.Status.LastScheduleTime != nil {
  11. earliestTime = cronJob.Status.LastScheduleTime.Time
  12. } else {
  13. earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
  14. }
  15. if cronJob.Spec.StartingDeadlineSeconds != nil {
  16. // controller is not going to schedule anything below this point
  17. schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
  18. if schedulingDeadline.After(earliestTime) {
  19. earliestTime = schedulingDeadline
  20. }
  21. }
  22. if earliestTime.After(now) {
  23. return nil, sched.Next(now), nil
  24. }
  25. starts := 0
  26. for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
  27. lastMissed = &t
  28. // An object might miss several starts. For example, if
  29. // controller gets wedged on Friday at 5:01pm when everyone has
  30. // gone home, and someone comes in on Tuesday AM and discovers
  31. // the problem and restarts the controller, then all the hourly
  32. // jobs, more than 80 of them for one hourly scheduledJob, should
  33. // all start running with no further intervention (if the scheduledJob
  34. // allows concurrency and late starts).
  35. //
  36. // However, if there is a bug somewhere, or incorrect clock
  37. // on controller's server or apiservers (for setting creationTimestamp)
  38. // then there could be so many missed start times (it could be off
  39. // by decades or more), that it would eat up all the CPU and memory
  40. // of this controller. In that case, we want to not try to list
  41. // all the missed start times.
  42. starts++
  43. if starts > 100 {
  44. // We can't get the most recent times so just return an empty slice
  45. return nil, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
  46. }
  47. }
  48. return lastMissed, sched.Next(now), nil
  49. }
  1. // figure out the next times that we need to create
  2. // jobs at (or anything we missed).
  3. missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
  4. if err != nil {
  5. log.Error(err, "unable to figure out CronJob schedule")
  6. // we don't really care about requeuing until we get an update that
  7. // fixes the schedule, so don't return an error
  8. return ctrl.Result{}, nil
  9. }

We’ll prep our eventual request to requeue until the next job, and then figureout if we actually need to run.

  1. scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
  2. log = log.WithValues("now", r.Now(), "next run", nextRun)

6: Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy

If we’ve missed a run, and we’re still within the deadline to start it, we’ll need to run a job.

  1. if missedRun == nil {
  2. log.V(1).Info("no upcoming scheduled times, sleeping until next")
  3. return scheduledResult, nil
  4. }
  5. // make sure we're not too late to start the run
  6. log = log.WithValues("current run", missedRun)
  7. tooLate := false
  8. if cronJob.Spec.StartingDeadlineSeconds != nil {
  9. tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
  10. }
  11. if tooLate {
  12. log.V(1).Info("missed starting deadline for last run, sleeping till next")
  13. // TODO(directxman12): events
  14. return scheduledResult, nil
  15. }

If we actually have to run a job, we’ll need to either wait till existing ones finish,replace the existing ones, or just add new ones. If our information is out of date dueto cache delay, we’ll get a requeue when we get up-to-date information.

  1. // figure out how to run this job -- concurrency policy might forbid us from running
  2. // multiple at the same time...
  3. if cronJob.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(activeJobs) > 0 {
  4. log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
  5. return scheduledResult, nil
  6. }
  7. // ...or instruct us to replace existing ones...
  8. if cronJob.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
  9. for _, activeJob := range activeJobs {
  10. // we don't care if the job was already deleted
  11. if err := r.Delete(ctx, activeJob); ignoreNotFound(err) != nil {
  12. log.Error(err, "unable to delete active job", "job", activeJob)
  13. return ctrl.Result{}, err
  14. }
  15. }
  16. }

Once we’ve figured out what to do with existing jobs, we’ll actually create our desired job constructJobForCronJobWe need to construct a job based on our CronJob’s template. We’ll copy over the specfrom the template and copy some basic object meta.

Then, we’ll set the “scheduled time” annotation so that we can reconstitute ourLastScheduleTime field each reconcile.

Finally, we’ll need to set an owner reference. This allows the Kubernetes garbage collectorto clean up jobs when we delete the CronJob, and allows controller-runtime to figure outwhich cronjob needs to be reconciled when a given job changes (is added, deleted, completes, etc).

  1. constructJobForCronJob := func(cronJob *batch.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
  2. // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
  3. name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())
  4. job := &kbatch.Job{
  5. ObjectMeta: metav1.ObjectMeta{
  6. Labels: make(map[string]string),
  7. Annotations: make(map[string]string),
  8. Name: name,
  9. Namespace: cronJob.Namespace,
  10. },
  11. Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
  12. }
  13. for k, v := range cronJob.Spec.JobTemplate.Annotations {
  14. job.Annotations[k] = v
  15. }
  16. job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
  17. for k, v := range cronJob.Spec.JobTemplate.Labels {
  18. job.Labels[k] = v
  19. }
  20. if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
  21. return nil, err
  22. }
  23. return job, nil
  24. }
  1. // actually make the job...
  2. job, err := constructJobForCronJob(&cronJob, *missedRun)
  3. if err != nil {
  4. log.Error(err, "unable to construct job from template")
  5. // don't bother requeuing until we get a change to the spec
  6. return scheduledResult, nil
  7. }
  8. // ...and create it on the cluster
  9. if err := r.Create(ctx, job); err != nil {
  10. log.Error(err, "unable to create Job for CronJob", "job", job)
  11. return ctrl.Result{}, err
  12. }
  13. log.V(1).Info("created Job for CronJob run", "job", job)

7: Requeue when we either see a running job or it’s time for the next scheduled run

Finally, we’ll return the result that we prepped above, that says we want to requeuewhen our next run would need to occur. This is taken as a maximum deadline — if somethingelse changes in between, like our job starts or finishes, we get modified, etc, we mightreconcile again sooner.

  1. // we'll requeue once we see the running job, and update our status
  2. return scheduledResult, nil
  3. }

Setup

Finally, we’ll update our setup. In order to allow our reconciler to quicklylook up Jobs by their owner, we’ll need an index. We declare an index key thatwe can later use with the client as a pseudo-field name, and then describe how toextract the indexed value from the Job object. The indexer will automatically takecare of namespaces for us, so we just have to extract the owner name if the Job hasa CronJob owner.

Additionally, we’ll inform the manager that this controller owns some Jobs, so that itwill automatically call Reconcile on the underlying CronJob when a Job changes, isdeleted, etc.

  1. var (
  2. jobOwnerKey = ".metadata.controller"
  3. apiGVStr = batch.GroupVersion.String()
  4. )
  5. func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
  6. // set up a real clock, since we're not in a test
  7. if r.Clock == nil {
  8. r.Clock = realClock{}
  9. }
  10. if err := mgr.GetFieldIndexer().IndexField(&kbatch.Job{}, jobOwnerKey, func(rawObj runtime.Object) []string {
  11. // grab the job object, extract the owner...
  12. job := rawObj.(*kbatch.Job)
  13. owner := metav1.GetControllerOf(job)
  14. if owner == nil {
  15. return nil
  16. }
  17. // ...make sure it's a CronJob...
  18. if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
  19. return nil
  20. }
  21. // ...and if so, return it
  22. return []string{owner.Name}
  23. }); err != nil {
  24. return err
  25. }
  26. return ctrl.NewControllerManagedBy(mgr).
  27. For(&batch.CronJob{}).
  28. Owns(&kbatch.Job{}).
  29. Complete(r)
  30. }

That was a doozy, but now we’ve got a working controller. Let’s testagainst the cluster, then, if we don’t have any issues, deploy it!