JobSink, triggering long-running background jobs when events occurs
Usually event processing combined with a Knative Service is expected to complete in a relative short period of time (minutes) as it requires the HTTP connection to stay open as otherwise the service is scaled down.
Keeping long-running connections open increases the possibility of failing and so the processing needs to restart as the request is retried.
This limitation is not ideal, JobSink
is a resource you can use to create long-running asynchronous jobs and tasks.
JobSink
supports the full Kubernetes batch/v1 Job resource and features and Kubernetes Job queuing systems like Kueue.
Prerequisites
You must have access to a Kubernetes cluster with Knative Eventing installed.
Usage
When an event is sent to a JobSink
, Eventing creates a Job
and mounts the received event as JSON file at /etc/jobsink-event/event
.
Create a
JobSink
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-logger
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "cat" ]
args:
- "/etc/jobsink-event/event"
Apply the
JobSink
resource:kubectl apply -f <job-sink-file.yaml>
Verify
JobSink
is ready:kubectl get jobsinks.sinks.knative.dev
Example output:
NAME URL AGE READY REASON
job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True
Trigger a
JobSink
kubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: my/curl/command" \
-H "ce-type: my.demo.event" \
-H "ce-id: 123" \
-d '{"details":"JobSinkDemo"}' \
http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
Verify a
Job
is created and prints the event:kubectl logs job-sink-loggerszoi6-dqbtq
Example output:
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSink idempotency
JobSink
will create a job for each different received event.
An event is uniquely identified by the combination of event source
and id
attributes.
If an event with the same source
and id
attributes is received and a job is already present, another Job
will not be created.
Reading the event file
You can read the file and deserialize it using any CloudEvents JSON deserializer.
For example, the following snippet reads an event using the CloudEvents Go SDK and processes it.
package mytask
import (
"encoding/json"
"fmt"
"os"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func handleEvent() error {
eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
if err != nil {
return err
}
event := &cloudevents.Event{}
if err := json.Unmarshal(eventBytes, event); err != nil {
return err
}
// Process event ...
fmt.Println(event)
return nil
}
Trigger a Job from different event sources
A JobSink
can be triggered by any event source or trigger.
For example, you can trigger a Job
when a Kafka record is sent to a Kafka topic using a KafkaSource:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
or when Knative Broker receives an event using a Trigger:
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-job-sink-trigger
spec:
broker: my-broker
filter:
attributes:
type: dev.knative.foo.bar
myextension: my-extension-value
subscriber:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
or even as dead letter sink for a Knative Broker
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-broker
spec:
# ...
delivery:
deadLetterSink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
retry: 5
backoffPolicy: exponential
backoffDelay: "PT1S"
Customizing the event file directory
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-custom-mount-path
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
# The event will be available in a file at `/etc/custom-path/event`
volumeMounts:
- name: "jobsink-event"
mountPath: "/etc/custom-path"
readOnly: true
Cleaning up finished jobs
To clean up finished jobs, you can set the spec.job.spec.ttlSecondsAfterFinished: 600 field and Kubernetes will remove finished jobs after 600 seconds (10 minutes).
JobSink examples
JobSink success example
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-success
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption
JobSink failure example
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-failure
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ] # example command simulating a bug which triggers the FailJob action
args:
- -c
- echo "Hello world!" && sleep 5 && exit 42
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption