Quickstart: Jobs

Get started with the Dapr jobs building block

Alpha

The jobs building block is currently in alpha.

Let’s take a look at the Dapr jobs building block, which schedules and runs jobs at a specific time or interval. In this Quickstart, you’ll schedule, get, and delete a job using Dapr’s Job API.

You can try out this jobs quickstart by either:

Run using Multi-App Run

Select your preferred language-specific Dapr SDK before proceeding with the Quickstart. Currently, you can experiment with the jobs API with the Go SDK.

This quickstart includes two apps:

  • job-scheduler.go: schedules, retrieves, and deletes jobs.
  • job-service.go: handles the scheduled jobs.

Step 1: Pre-requisites

For this example, you will need:

Step 2: Set up the environment

Clone the sample provided in the Quickstarts repo.

  1. git clone https://github.com/dapr/quickstarts.git

From the root of the Quickstarts directory, navigate into the jobs directory:

  1. cd jobs/go/sdk

Step 3: Schedule jobs

Run the application and schedule jobs with one command:

  1. dapr run -f .

Expected output

  1. == APP - job-service == dapr client initializing for: 127.0.0.1:6281
  2. == APP - job-service == Registered job handler for: R2-D2
  3. == APP - job-service == Registered job handler for: C-3PO
  4. == APP - job-service == Registered job handler for: BB-8
  5. == APP - job-service == Starting server on port: 6200
  6. == APP - job-service == Job scheduled: R2-D2
  7. == APP - job-service == Job scheduled: C-3PO
  8. == APP - job-service == 2024/07/17 18:09:59 job:{name:"C-3PO" due_time:"10s" data:{value:"{\"droid\":\"C-3PO\",\"Task\":\"Memory Wipe\"}"}}
  9. == APP - job-scheduler == Get job response: {"droid":"C-3PO","Task":"Memory Wipe"}
  10. == APP - job-service == Job scheduled: BB-8
  11. == APP - job-service == 2024/07/17 18:09:59 job:{name:"BB-8" due_time:"15s" data:{value:"{\"droid\":\"BB-8\",\"Task\":\"Internal Gyroscope Check\"}"}}
  12. == APP - job-scheduler == Get job response: {"droid":"BB-8","Task":"Internal Gyroscope Check"}
  13. == APP - job-scheduler == Deleted job: BB-8

After 5 seconds, the terminal output should present the R2-D2 job being processed:

  1. == APP - job-service == Starting droid: R2-D2
  2. == APP - job-service == Executing maintenance job: Oil Change

After 10 seconds, the terminal output should present the C3-PO job being processed:

  1. == APP - job-service == Starting droid: C-3PO
  2. == APP - job-service == Executing maintenance job: Memory Wipe

Once the process has completed, you can stop and clean up application processes with a single command.

  1. dapr stop -f .

What happened?

When you ran dapr init during Dapr install:

Running dapr run -f . in this Quickstart started both the job-scheduler and the job-service. In the terminal output, you can see the following jobs being scheduled, retrieved, and deleted.

  • The R2-D2 job is being scheduled.
  • The C-3PO job is being scheduled.
  • The C-3PO job is being retrieved.
  • The BB-8 job is being scheduled.
  • The BB-8 job is being retrieved.
  • The BB-8 job is being deleted.
  • The R2-D2 job is being executed after 5 seconds.
  • The R2-D2 job is being executed after 10 seconds.

dapr.yaml Multi-App Run template file

Running the Multi-App Run template file with dapr run -f . starts all applications in your project. In this Quickstart, the dapr.yaml file contains the following:

  1. version: 1
  2. apps:
  3. - appDirPath: ./job-service/
  4. appID: job-service
  5. appPort: 6200
  6. daprGRPCPort: 6281
  7. appProtocol: grpc
  8. command: ["go", "run", "."]
  9. - appDirPath: ./job-scheduler/
  10. appID: job-scheduler
  11. appPort: 6300
  12. command: ["go", "run", "."]

job-service app

The job-service application creates service invocation handlers to manage the lifecycle of the job (scheduleJob, getJob, and deleteJob).

  1. if err := server.AddServiceInvocationHandler("scheduleJob", scheduleJob); err != nil {
  2. log.Fatalf("error adding invocation handler: %v", err)
  3. }
  4. if err := server.AddServiceInvocationHandler("getJob", getJob); err != nil {
  5. log.Fatalf("error adding invocation handler: %v", err)
  6. }
  7. if err := server.AddServiceInvocationHandler("deleteJob", deleteJob); err != nil {
  8. log.Fatalf("error adding invocation handler: %v", err)
  9. }

Next, job event handlers are registered for all droids:

  1. for _, jobName := range jobNames {
  2. if err := server.AddJobEventHandler(jobName, handleJob); err != nil {
  3. log.Fatalf("failed to register job event handler: %v", err)
  4. }
  5. fmt.Println("Registered job handler for: ", jobName)
  6. }
  7. fmt.Println("Starting server on port: " + appPort)
  8. if err = server.Start(); err != nil {
  9. log.Fatalf("failed to start server: %v", err)
  10. }

The job-service then call functions that handle scheduling, getting, deleting, and handling job events.

  1. // Handler that schedules a DroidJob
  2. func scheduleJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) {
  3. if in == nil {
  4. err = errors.New("no invocation parameter")
  5. return
  6. }
  7. droidJob := DroidJob{}
  8. err = json.Unmarshal(in.Data, &droidJob)
  9. if err != nil {
  10. fmt.Println("failed to unmarshal job: ", err)
  11. return nil, err
  12. }
  13. jobData := JobData{
  14. Droid: droidJob.Name,
  15. Task: droidJob.Job,
  16. }
  17. content, err := json.Marshal(jobData)
  18. if err != nil {
  19. fmt.Printf("Error marshalling job content")
  20. return nil, err
  21. }
  22. // schedule job
  23. job := daprc.Job{
  24. Name: droidJob.Name,
  25. DueTime: droidJob.DueTime,
  26. Data: &anypb.Any{
  27. Value: content,
  28. },
  29. }
  30. err = app.daprClient.ScheduleJobAlpha1(ctx, &job)
  31. if err != nil {
  32. fmt.Println("failed to schedule job. err: ", err)
  33. return nil, err
  34. }
  35. fmt.Println("Job scheduled: ", droidJob.Name)
  36. out = &common.Content{
  37. Data: in.Data,
  38. ContentType: in.ContentType,
  39. DataTypeURL: in.DataTypeURL,
  40. }
  41. return out, err
  42. }
  43. // Handler that gets a job by name
  44. func getJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) {
  45. if in == nil {
  46. err = errors.New("no invocation parameter")
  47. return nil, err
  48. }
  49. job, err := app.daprClient.GetJobAlpha1(ctx, string(in.Data))
  50. if err != nil {
  51. fmt.Println("failed to get job. err: ", err)
  52. }
  53. out = &common.Content{
  54. Data: job.Data.Value,
  55. ContentType: in.ContentType,
  56. DataTypeURL: in.DataTypeURL,
  57. }
  58. return out, err
  59. }
  60. // Handler that deletes a job by name
  61. func deleteJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) {
  62. if in == nil {
  63. err = errors.New("no invocation parameter")
  64. return nil, err
  65. }
  66. err = app.daprClient.DeleteJobAlpha1(ctx, string(in.Data))
  67. if err != nil {
  68. fmt.Println("failed to delete job. err: ", err)
  69. }
  70. out = &common.Content{
  71. Data: in.Data,
  72. ContentType: in.ContentType,
  73. DataTypeURL: in.DataTypeURL,
  74. }
  75. return out, err
  76. }
  77. // Handler that handles job events
  78. func handleJob(ctx context.Context, job *common.JobEvent) error {
  79. var jobData common.Job
  80. if err := json.Unmarshal(job.Data, &jobData); err != nil {
  81. return fmt.Errorf("failed to unmarshal job: %v", err)
  82. }
  83. var jobPayload JobData
  84. if err := json.Unmarshal(job.Data, &jobPayload); err != nil {
  85. return fmt.Errorf("failed to unmarshal payload: %v", err)
  86. }
  87. fmt.Println("Starting droid:", jobPayload.Droid)
  88. fmt.Println("Executing maintenance job:", jobPayload.Task)
  89. return nil
  90. }

job-scheduler app

In the job-scheduler application, the R2D2, C3PO, and BB8 jobs are first defined as []DroidJob:

  1. droidJobs := []DroidJob{
  2. {Name: "R2-D2", Job: "Oil Change", DueTime: "5s"},
  3. {Name: "C-3PO", Job: "Memory Wipe", DueTime: "15s"},
  4. {Name: "BB-8", Job: "Internal Gyroscope Check", DueTime: "30s"},
  5. }

The jobs are then scheduled, retrieved, and deleted using the jobs API. As you can see from the terminal output, first the R2D2 job is scheduled:

  1. // Schedule R2D2 job
  2. err = schedule(droidJobs[0])
  3. if err != nil {
  4. log.Fatalln("Error scheduling job: ", err)
  5. }

Then, the C3PO job is scheduled, and returns job data:

  1. // Schedule C-3PO job
  2. err = schedule(droidJobs[1])
  3. if err != nil {
  4. log.Fatalln("Error scheduling job: ", err)
  5. }
  6. // Get C-3PO job
  7. resp, err := get(droidJobs[1])
  8. if err != nil {
  9. log.Fatalln("Error retrieving job: ", err)
  10. }
  11. fmt.Println("Get job response: ", resp)

The BB8 job is then scheduled, retrieved, and deleted:

  1. // Schedule BB-8 job
  2. err = schedule(droidJobs[2])
  3. if err != nil {
  4. log.Fatalln("Error scheduling job: ", err)
  5. }
  6. // Get BB-8 job
  7. resp, err = get(droidJobs[2])
  8. if err != nil {
  9. log.Fatalln("Error retrieving job: ", err)
  10. }
  11. fmt.Println("Get job response: ", resp)
  12. // Delete BB-8 job
  13. err = delete(droidJobs[2])
  14. if err != nil {
  15. log.Fatalln("Error deleting job: ", err)
  16. }
  17. fmt.Println("Job deleted: ", droidJobs[2].Name)

The job-scheduler.go also defines the schedule, get, and delete functions, calling from job-service.go.

  1. // Schedules a job by invoking grpc service from job-service passing a DroidJob as an argument
  2. func schedule(droidJob DroidJob) error {
  3. jobData, err := json.Marshal(droidJob)
  4. if err != nil {
  5. fmt.Println("Error marshalling job content")
  6. return err
  7. }
  8. content := &daprc.DataContent{
  9. ContentType: "application/json",
  10. Data: []byte(jobData),
  11. }
  12. // Schedule Job
  13. _, err = app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "scheduleJob", "POST", content)
  14. if err != nil {
  15. fmt.Println("Error invoking method: ", err)
  16. return err
  17. }
  18. return nil
  19. }
  20. // Gets a job by invoking grpc service from job-service passing a job name as an argument
  21. func get(droidJob DroidJob) (string, error) {
  22. content := &daprc.DataContent{
  23. ContentType: "text/plain",
  24. Data: []byte(droidJob.Name),
  25. }
  26. //get job
  27. resp, err := app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "getJob", "GET", content)
  28. if err != nil {
  29. fmt.Println("Error invoking method: ", err)
  30. return "", err
  31. }
  32. return string(resp), nil
  33. }
  34. // Deletes a job by invoking grpc service from job-service passing a job name as an argument
  35. func delete(droidJob DroidJob) error {
  36. content := &daprc.DataContent{
  37. ContentType: "text/plain",
  38. Data: []byte(droidJob.Name),
  39. }
  40. _, err := app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "deleteJob", "DELETE", content)
  41. if err != nil {
  42. fmt.Println("Error invoking method: ", err)
  43. return err
  44. }
  45. return nil
  46. }

Run one job application at a time

This quickstart includes two apps:

  • job-scheduler.go: schedules, retrieves, and deletes jobs.
  • job-service.go: handles the scheduled jobs.

Step 1: Pre-requisites

For this example, you will need:

Step 2: Set up the environment

Clone the sample provided in the Quickstarts repo.

  1. git clone https://github.com/dapr/quickstarts.git

From the root of the Quickstarts directory, navigate into the jobs directory:

  1. cd jobs/go/sdk

Step 3: Schedule jobs

In the terminal, run the job-service app:

  1. dapr run --app-id job-service --app-port 6200 --dapr-grpc-port 6281 --app-protocol grpc -- go run .

Expected output

  1. == APP == dapr client initializing for: 127.0.0.1:6281
  2. == APP == Registered job handler for: R2-D2
  3. == APP == Registered job handler for: C-3PO
  4. == APP == Registered job handler for: BB-8
  5. == APP == Starting server on port: 6200

In a new terminal window, run the job-scheduler app:

  1. dapr run --app-id job-scheduler --app-port 6300 -- go run .

Expected output

  1. == APP == dapr client initializing for:
  2. == APP == Get job response: {"droid":"C-3PO","Task":"Memory Wipe"}
  3. == APP == Get job response: {"droid":"BB-8","Task":"Internal Gyroscope Check"}
  4. == APP == Job deleted: BB-8

Return to the job-service app terminal window. The output should be:

  1. == APP == Job scheduled: R2-D2
  2. == APP == Job scheduled: C-3PO
  3. == APP == 2024/07/17 18:25:36 job:{name:"C-3PO" due_time:"10s" data:{value:"{\"droid\":\"C-3PO\",\"Task\":\"Memory Wipe\"}"}}
  4. == APP == Job scheduled: BB-8
  5. == APP == 2024/07/17 18:25:36 job:{name:"BB-8" due_time:"15s" data:{value:"{\"droid\":\"BB-8\",\"Task\":\"Internal Gyroscope Check\"}"}}
  6. == APP == Starting droid: R2-D2
  7. == APP == Executing maintenance job: Oil Change
  8. == APP == Starting droid: C-3PO
  9. == APP == Executing maintenance job: Memory Wipe

Unpack what happened in the job-service and job-scheduler applications when you ran dapr run.

Watch the demo

See the jobs API in action using a Go HTTP example, recorded during the [Dapr Community Call #107(https://www.youtube.com/live/WHGOc7Ec\_YQ?si=JlOlcJKkhRuhf5R1&t=849)\].

Tell us what you think!

We’re continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement?

Join the discussion in our discord channel.

Next steps

Explore Dapr tutorials >>

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)