JobSink, triggering long-running background jobs when events occurs

Usually event processing combined with a Knative Service is expected to complete in a relative short period of time (minutes) as it requires the HTTP connection to stay open as otherwise the service is scaled down.

Keeping long-running connections open increases the possibility of failing and so the processing needs to restart as the request is retried.

This limitation is not ideal, JobSink is a resource you can use to create long-running asynchronous jobs and tasks.

JobSink supports the full Kubernetes batch/v1 Job resource and features and Kubernetes Job queuing systems like Kueue.

Prerequisites

You must have access to a Kubernetes cluster with Knative Eventing installed.

Usage

When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as JSON file at /etc/jobsink-event/event.

  1. Create a JobSink

    1. apiVersion: sinks.knative.dev/v1alpha1
    2. kind: JobSink
    3. metadata:
    4. name: job-sink-logger
    5. spec:
    6. job:
    7. spec:
    8. completions: 1
    9. parallelism: 1
    10. template:
    11. spec:
    12. restartPolicy: Never
    13. containers:
    14. - name: main
    15. image: docker.io/library/bash:5
    16. command: [ "cat" ]
    17. args:
    18. - "/etc/jobsink-event/event"
  2. Apply the JobSink resource:

    1. kubectl apply -f <job-sink-file.yaml>
  3. Verify JobSink is ready:

    1. kubectl get jobsinks.sinks.knative.dev

    Example output:

    1. NAME URL AGE READY REASON
    2. job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True
  4. Trigger a JobSink

    1. kubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
    2. -H "content-type: application/json" \
    3. -H "ce-specversion: 1.0" \
    4. -H "ce-source: my/curl/command" \
    5. -H "ce-type: my.demo.event" \
    6. -H "ce-id: 123" \
    7. -d '{"details":"JobSinkDemo"}' \
    8. http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
  5. Verify a Job is created and prints the event:

    1. kubectl logs job-sink-loggerszoi6-dqbtq

    Example output:

    1. {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}

JobSink idempotency

JobSink will create a job for each different received event.

An event is uniquely identified by the combination of event source and id attributes.

If an event with the same source and id attributes is received and a job is already present, another Job will not be created.

Reading the event file

You can read the file and deserialize it using any CloudEvents JSON deserializer.

For example, the following snippet reads an event using the CloudEvents Go SDK and processes it.

  1. package mytask
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. cloudevents "github.com/cloudevents/sdk-go/v2"
  7. )
  8. func handleEvent() error {
  9. eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
  10. if err != nil {
  11. return err
  12. }
  13. event := &cloudevents.Event{}
  14. if err := json.Unmarshal(eventBytes, event); err != nil {
  15. return err
  16. }
  17. // Process event ...
  18. fmt.Println(event)
  19. return nil
  20. }

Trigger a Job from different event sources

A JobSink can be triggered by any event source or trigger.

For example, you can trigger a Job when a Kafka record is sent to a Kafka topic using a KafkaSource:

  1. apiVersion: sources.knative.dev/v1beta1
  2. kind: KafkaSource
  3. metadata:
  4. name: kafka-source
  5. spec:
  6. bootstrapServers:
  7. - my-cluster-kafka-bootstrap.kafka:9092
  8. topics:
  9. - knative-demo-topic
  10. sink:
  11. ref:
  12. apiVersion: sinks.knative.dev/v1alpha1
  13. kind: JobSink
  14. name: job-sink-logger

or when Knative Broker receives an event using a Trigger:

  1. apiVersion: eventing.knative.dev/v1
  2. kind: Trigger
  3. metadata:
  4. name: my-job-sink-trigger
  5. spec:
  6. broker: my-broker
  7. filter:
  8. attributes:
  9. type: dev.knative.foo.bar
  10. myextension: my-extension-value
  11. subscriber:
  12. ref:
  13. apiVersion: sinks.knative.dev/v1alpha1
  14. kind: JobSink
  15. name: job-sink-logger

or even as dead letter sink for a Knative Broker

  1. apiVersion: eventing.knative.dev/v1
  2. kind: Broker
  3. metadata:
  4. name: my-broker
  5. spec:
  6. # ...
  7. delivery:
  8. deadLetterSink:
  9. ref:
  10. apiVersion: sinks.knative.dev/v1alpha1
  11. kind: JobSink
  12. name: job-sink-logger
  13. retry: 5
  14. backoffPolicy: exponential
  15. backoffDelay: "PT1S"

Customizing the event file directory

  1. apiVersion: sinks.knative.dev/v1alpha1
  2. kind: JobSink
  3. metadata:
  4. name: job-sink-custom-mount-path
  5. spec:
  6. job:
  7. spec:
  8. completions: 1
  9. parallelism: 1
  10. template:
  11. spec:
  12. restartPolicy: Never
  13. containers:
  14. - name: main
  15. image: docker.io/library/bash:5
  16. command: [ "bash" ]
  17. args:
  18. - -c
  19. - echo "Hello world!" && sleep 5
  20. # The event will be available in a file at `/etc/custom-path/event`
  21. volumeMounts:
  22. - name: "jobsink-event"
  23. mountPath: "/etc/custom-path"
  24. readOnly: true

Cleaning up finished jobs

To clean up finished jobs, you can set the spec.job.spec.ttlSecondsAfterFinished: 600 field and Kubernetes will remove finished jobs after 600 seconds (10 minutes).

JobSink examples

JobSink success example

  1. apiVersion: sinks.knative.dev/v1alpha1
  2. kind: JobSink
  3. metadata:
  4. name: job-sink-success
  5. spec:
  6. job:
  7. metadata:
  8. labels:
  9. my-label: my-value
  10. spec:
  11. completions: 12
  12. parallelism: 3
  13. template:
  14. spec:
  15. restartPolicy: Never
  16. containers:
  17. - name: main
  18. image: docker.io/library/bash:5
  19. command: [ "bash" ]
  20. args:
  21. - -c
  22. - echo "Hello world!" && sleep 5
  23. backoffLimit: 6
  24. podFailurePolicy:
  25. rules:
  26. - action: FailJob
  27. onExitCodes:
  28. containerName: main # optional
  29. operator: In # one of: In, NotIn
  30. values: [ 42 ]
  31. - action: Ignore # one of: Ignore, FailJob, Count
  32. onPodConditions:
  33. - type: DisruptionTarget # indicates Pod disruption

JobSink failure example

  1. apiVersion: sinks.knative.dev/v1alpha1
  2. kind: JobSink
  3. metadata:
  4. name: job-sink-failure
  5. spec:
  6. job:
  7. metadata:
  8. labels:
  9. my-label: my-value
  10. spec:
  11. completions: 12
  12. parallelism: 3
  13. template:
  14. spec:
  15. restartPolicy: Never
  16. containers:
  17. - name: main
  18. image: docker.io/library/bash:5
  19. command: [ "bash" ] # example command simulating a bug which triggers the FailJob action
  20. args:
  21. - -c
  22. - echo "Hello world!" && sleep 5 && exit 42
  23. backoffLimit: 6
  24. podFailurePolicy:
  25. rules:
  26. - action: FailJob
  27. onExitCodes:
  28. containerName: main # optional
  29. operator: In # one of: In, NotIn
  30. values: [ 42 ]
  31. - action: Ignore # one of: Ignore, FailJob, Count
  32. onPodConditions:
  33. - type: DisruptionTarget # indicates Pod disruption