Job Management
Job
is the fundamental object of high performance workload; this document provides the definition of Job
in Volcano.
The definition of Job
follow Kuberentes’s style, e.g. Status, Spec; the follow sections will only describethe major functions of Job
.
Multiple Pod Template
As most jobs of high performance workload include different type of tasks, e.g. TensorFlow (ps/worker), Spark (driver/executor);Job
introduces taskSpecs
to support multiple pod template, defined as follow. The Policies
will describe in Error Handling section.
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
// Name specifies the name of task
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// Replicas specifies the replicas of this TaskSpec in Job
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
// Specifies the pod that will be created for this TaskSpec
// when executing a Job
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
JobController
will create Pods based on the templates and replicas in spec.tasks
;the controlled OwnerReference
of Pod will be set to the Job
. The following isan example YAML with multiple pod template.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
tasks:
- name: "ps"
replicas: 2
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
Job Input/Output
Most of high performance workload will handle data which is considering as input/output of a Job.The following types are introduced for Job’s input/output.
type VolumeSpec struct {
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
// defined the PVC name
// + optional
VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`
// VolumeClaim defines the PVC used by the VolumeSpec.
// + optional
VolumeClaim *PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,3,opt,name=claim"`
}
type JobSpec struct{
...
// The volumes mount on Job
// +optional
Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,1,opt,name=volumes"`
}
The Volumes
of Job can be nil
which means user will manage data themselves. If VolumeSpec.volumeClaim
is nil
and VolumeSpec.volumeClaimName
is nil
or not exist in PersistentVolumeClaim,emptyDir
volume will be used for each Task/Pod.
Conditions and Phases
The following phases are introduced to give a simple, high-level summary of where the Job is in its lifecycle; and the conditions array,the reason and message field contain more detail about the job’s status.
type JobPhase string
const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
// Aborting is the phase that job is aborted, waiting for releasing pods
Aborting JobPhase = "Aborting"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)
// JobState contains details for the current state of the job.
type JobState struct {
// The phase of Job.
// +optional
Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`
// Unique, one-word, CamelCase reason for the phase's last transition.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`
// Human-readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
}
// JobStatus represents the current state of a Job
type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
......
}
The following table shows available transactions between different phases. The phase can not transfer to the targetphase if the cell is empty.
From \ To | Pending | Aborted | Running | Completed | Terminated |
---|---|---|---|---|---|
Pending | |||||
Aborted | |||||
Running | |||||
Completed | |||||
Terminated | * |
Restarting
, Aborting
and Terminating
are temporary states to avoid race condition, e.g. there’ll be severalPodeEvictedEvent
s because of TerminateJobAction
which should not be handled again.
Error Handling
After Job was created in system, there’ll be several events related to the Job, e.g. Pod succeeded, Pod failed;and some events are critical to the Job, e.g. Pod of MPIJob failed. So LifecyclePolicy
is introduced to handle differentevents based on user’s configuration.
// Event is the type of Event related to the Job
type Event string
const (
// AllEvents means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)
// Action is the type of event handling
type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
)
// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
Event Event `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}
Both JobSpec
and TaskSpec
include lifecycle policy: the policies in JobSpec
are the default policy if no policiesin TaskSpec
; the policies in TaskSpec
will overwrite defaults.
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,6,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
...
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
The following examples demonstrate the usage of LifecyclePolicy
for job and task.
For the training job of machine learning framework, the whole job should be restarted if any task was failed or evicted.To simplify the configuration, a job level LifecyclePolicy
is set as follows. As no LifecyclePolicy
is set for anytask, all tasks will use the policies in spec.policies
.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# If any event here, restart the whole job.
policies:
- event: *
action: RestartJob
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
...
Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restartedif ‘driver’ tasks failed and only restart the task if ‘executor’ tasks failed. OnFailure
restartPolicy is set for executorand RestartJob
is set for driver spec.tasks.policies
as follow.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
tasks:
- name: "driver"
replicas: 1
policies:
- event: *
action: RestartJob
template:
spec:
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
restartPolicy: OnFailure
Features Interaction
Admission Controller
The following validations must be included to make sure expected behaviours:
spec.minAvailable
<= sum(spec.taskSpecs.replicas
)- no duplicated name in
spec.taskSpecs
array - no duplicated event handler in
LifecyclePolicy
array, both job policies and task policies
CoScheduling
CoScheduling (or Gang-scheduling) is required by most of high performance workload, e.g. TF training job, MPI job.The spec.minAvailable
is used to identify how many pods will be scheduled together. The default value of spec.minAvailable
is summary of spec.tasks.replicas
. The admission controller web hook will check spec.minAvailable
againstthe summary of spec.tasks.replicas
; the job creation will be rejected if spec.minAvailable
> sum(spec.tasks.replicas
).If spec.minAvailable
< sum(spec.tasks.replicas
), the pod of spec.tasks
will be created randomly;refer to Task Priority with Job section on how to create tasks in order.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# minAvailable to run job
minAvailable: 6
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: "ps"
image: "ps-img"
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: "worker"
image: "worker-img"
Task Priority within Job
In addition to multiple pod template, the priority of each task maybe different. PriorityClass
of PodTemplate
is reusedto define the priority of task within a job. This’s an example to run spark job: 1 driver with 5 executors, the driver’spriority is master-pri
which is higher than normal pods; as spec.minAvailable
is 3, the scheduler will make sure one driverwith 2 executors will be scheduled if not enough resources.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
NOTE: although scheduler will make sure high priority pods with job will be scheduled firstly, there’s still a racecondition between different kubelets that low priority pod maybe launched early; the job/task dependency will be introducedlater to handle such kind of race condition.
Resource sharing between Job
By default, the spec.minAvailable
is set to the summary of spec.tasks.replicas
; if it’s set to a smaller value,the pod beyond spec.minAvailable
will share resource between jobs.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
Plugins for Job
As many jobs of AI frame, e.g. TensorFlow, MPI, Mxnet, need set env, pods communicate, ssh sign in without password.We provide Job api plugins to give users a better focus on core business.Now we have three plugins, every plugin has parameters, if not provided, we use default.
- env: set VK_TASK_INDEX to each container, is a index for giving the identity to container.
- svc: create Serivce and *.host to enable pods communicate.
- ssh: sign in ssh without password, e.g. use command mpirun or mpiexec.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: mpi-job
spec:
minAvailable: 2
schedulerName: scheduler
policies:
- event: PodEvicted
action: RestartJob
plugins:
ssh: []
env: []
svc: []
tasks:
- replicas: 1
name: mpimaster
template:
spec:
containers:
image: mpi-image
name: mpimaster
- replicas: 2
name: mpiworker
template:
spec:
containers:
image: mpi-image
name: mpiworker