web2py Scheduler

The mainstream web2py solution for running tasks in the background (and therefore away from the webserver process) is the built-in scheduler.

The stable API consists of these functions:

  • disable()
  • resume()
  • terminate()
  • kill()
  • queue_task()
  • task_status()
  • stop_task()

The web2py scheduler works very much like the task queue described in the previous sub-section with some differences:

  • It provides a standard mechanism for creating, scheduling, and monitoring tasks.
  • There is not a single background process but a set of workers processes.
  • The job of worker nodes can be monitored because their state, as well as the state of the tasks, is stored in the database.
  • It works without web2py but that is not documented here.

The scheduler does not use cron, although one can use cron @reboot to start the worker nodes.

More information about deploying the scheduler under Linux and Windows is in Chapter 13.

In the scheduler, a task is simply a function defined in a model (or in a module and imported by a model). For example:

  1. def task_add(a, b):
  2. return a + b

Tasks will always be called in the same environment seen by controllers and therefore they see all the global variables defined in models, including database connections (db). Tasks differ from a controller action because they are not associated with an HTTP request and therefore there is no request.env. Also, tasks can access another environmental variable that is not present in normal requests: W2P_TASK. W2P_TASK.id holds the scheduler_task.id and W2P_TASK.uuid the scheduler_task.uuid field of the task that is running.

Remember to call db.commit() at the end of every task if it involves inserts/updates to the database. web2py commits by default at the end of a successful action but the scheduler tasks are not actions.

To enable the scheduler you must instantiate the Scheduler class in a model. The recommended way to enable the scheduler to your app is to create a model file named scheduler.py and define your function there. After the functions, you can put the following code into the model:

  1. from gluon.scheduler import Scheduler
  2. scheduler = Scheduler(db)

If your tasks are defined in a module (as opposed to a model) you may have to restart the workers.

The task is scheduled with

  1. scheduler.queue_task(task_add, pvars=dict(a=1, b=2))

Parameters

The first argument of the Scheduler class must be the database to be used by the scheduler to communicate with the workers. This can be the db of the app or another dedicated db, perhaps one shared by multiple apps. If you use SQLite it’s recommended to use a separate db from the one used by your app in order to keep the app responsive. Once the tasks are defined and the Scheduler is instantiated, all that is needed to do is to start the workers. You can do that in several ways:

  1. python web2py.py -K myapp

starts a worker for the app myapp. If you want start multiple workers for the same app, you can do so just passing myapp,myapp. You can pass also the group_names (overriding the one set in your model) with

  1. python web2py.py -K myapp:group1:group2,myotherapp:group1

If you have a model called scheduler.py you can start/stop the workers from web2py’s default window (the one you use to set the ip address and the port).

Scheduler Deployment

One last nice addition: if you use the embedded webserver, you can start the webserver and the scheduler with just one line of code (this assumes you don’t want the web2py window popping up, else you can use the “Schedulers” menu instead)

  1. python web2py.py -a yourpass -K myapp -X

You can pass the usual parameters (-i, -p, here -a prevents the window from showing up), pass whatever app in the -K parameter and append a -X. The scheduler will run alongside the webserver!

Windows users looking to create a service should see in Chapter 13.

Complete Scheduler signature

Scheduler’s complete signature is:

  1. Scheduler(db,
  2. tasks=None,
  3. migrate=True,
  4. worker_name=None,
  5. group_names=None,
  6. heartbeat=HEARTBEAT,
  7. max_empty_runs=0,
  8. discard_results=False,
  9. utc_time=False)

Let’s see them in order:

  • db is the database DAL instance where you want the scheduler tables be placed.
  • tasks is a dictionary that maps task names into functions. If you do not pass this parameter, function will be searched in the app environment.
  • worker_name is None by default. As soon as the worker is started, a worker name is generated as hostname-uuid. If you want to specify that, be sure that it’s unique.
  • group_names is by default set to [main]. All tasks have a group_name parameter, set to main by default. Workers can only pick up tasks of their assigned group.

NB: This is useful if you have different workers instances (e.g. on different machines) and you want to assign tasks to a specific worker.

NB2: It’s possible to assign a worker more groups, and they can be also all the same, as ['mygroup','mygroup']. Tasks will be distributed taking into consideration that a worker with group_names ['mygroup','mygroup'] is able to process the double of the tasks a worker with group_names ['mygroup'] is.

  • heartbeat is by default set to 3 seconds. This parameter is the one controlling how often a scheduler will check its status on the scheduler_worker table and see if there are any ASSIGNED tasks to itself to process.
  • max_empty_runs is 0 by default, that means that the worker will continue to process tasks as soon as they are ASSIGNED. If you set this to a value of, let’s say, 10, a worker will die automatically if it’s ACTIVE and no tasks are ASSIGNED to it for 10 loops. A loop is when a worker searches for tasks, every 3 seconds (or the set heartbeat)
  • discard_results is False by default. If set to True, no scheduler_run records will be created.

NB: scheduler_run records will be created as before for FAILED, TIMEOUT and STOPPED tasks’s statuses.

  • utc_time is False by default. If you need to coordinate with workers living in different timezones, or do have problems with solar/DST times, supplying datetimes from different countries, etc, you can set this to True. The scheduler will honor the UTC time and work leaving the local time aside. Caveat: you need to schedule tasks with UTC times (for start_time, stop_time, and so on).

Now we have the infrastructure in place: defined the tasks, told the scheduler about them, started the worker(s). What remains is to actually schedule the tasks.

Tasks

Tasks can be scheduled programmatically or via appadmin. In fact, a task is scheduled simply by adding an entry in the table “scheduler_task”, which you can access via appadmin:

  1. http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task

The meaning of the fields in this table is obvious. The “args” and “vars”” fields are the values to be passed to the task in JSON format. In the case of the “task_add” above, an example of “args” and “vars” could be:

  1. args = [3, 4]
  2. vars = {}

or

  1. args = []
  2. vars = {'a': 3, 'b': 4}

The scheduler_task table is the one where tasks are organized.

To add tasks via the API, use

  1. scheduler.queue_task('mytask', ...)

as explained later in this chapter in queue_task.

Task Lifecycle

All tasks follow a lifecycle

scheduler tasks

By default, when you send a task to the scheduler, it is in the QUEUED status. If you need it to be executed later, use the start_time parameter (default = now). If for some reason you need to be sure that the task does not get executed after a certain point in time (maybe a request to a web service that shuts down at 1AM, a mail that needs to be sent not after the working hours, etc…) you can set a stop_time (default = None) for it. If your task is NOT picked up by a worker before stop_time, it will be set as EXPIRED. Tasks with no stop_time set or picked up BEFORE stop_time are ASSIGNED to a worker. When a workers picks up a task, its status is set to RUNNING.

RUNNING tasks may end up:

  • TIMEOUT when more than n seconds passed with timeout parameter (default = 60 seconds).
  • FAILED when an exception is detected,
  • COMPLETED when they successfully complete.

Values for start_time and stop_time should be datetime objects. To schedule “mytask” to run at 30 seconds from the current time, for example, you would do the following:

  1. from datetime import timedelta as timed
  2. scheduler.queue_task('mytask', start_time=request.now + timed(seconds=30))

Additionally, you can control how many times a task should be repeated (i.e. you need to aggregate some data at specified intervals). To do so, set the repeats parameter (default = 1 time only, 0 = unlimited). You can influence how many seconds should pass between executions with the period parameter (default = 60 seconds).

Default behavior: The time period is not calculated between the END of the first round and the START of the next, but from the START time of the first round to the START time of the next cycle). This can cause accumulating ‘drift’ in the start time of a job. After v 2.8.2, a new parameter prevent_drift was added, defaulting to False. If set to True when queing a task, the start_time parameter will take precedence over the period, preventing drift.

You can also set how many times the function can raise an exception (i.e. requesting data from a slow web service) and be queued again instead of stopping in FAILED status using the parameter retry_failed (default = 0, -1 = unlimited).

task repeats

Summary: you have

  • period and repeats to get an automatically rescheduled function
  • timeout to be sure that a function doesn’t exceed a certain amount of time
  • retry_failed to control how many times the task can “fail”
  • start_time and stop_time to schedule a function in a restricted timeframe

queue_task

The method:

  1. scheduler.queue_task(function,
  2. pargs=[],
  3. pvars={},
  4. start_time=now, # datetime
  5. stop_time=None, # datetime
  6. timeout = 60, # seconds
  7. prevent_drift=False,
  8. period=60, # seconds
  9. immediate=False,
  10. repeats=1)

allows you to queue tasks to be executed by workers. It returns a row (see here), and it takes the following parameters:

  • function (required): It can be a task name or a reference to an actual function.
  • pargs: are the arguments to be passed to the task, stored as a Python list.
  • pvars : are the named arguments to be passed to the task, stored as a Python dictionary.
  • all other scheduler_task columns can be passed as keyword arguments; the most important are shown.

For example:

  1. scheduler.queue_task('demo1', [1, 2])

does the exact same thing as

  1. scheduler.queue_task('demo1', pvars={'a': 1, 'b': 2})

as

  1. st.validate_and_insert(function_name='demo1', args=json.dumps([1, 2]))

and as:

  1. st.validate_and_insert(function_name='demo1', vars=json.dumps({'a': 1, 'b': 2}))

Here is a more complex complete example:

  1. def task_add(a, b):
  2. return a + b
  3. scheduler = Scheduler(db, tasks=dict(demo1=task_add))
  4. scheduler.queue_task('demo1', pvars=dict(a=1, b=2), repeats = 0, period=180)

Since version 2.4.1 if you pass an additional parameter immediate=True it will force the main worker to reassign tasks. Until 2.4.1, the worker checks for new tasks every 5 cycles (so, 5*heartbeats seconds). If you had an app that needed to check frequently for new tasks, to get a snappy behaviour you were forced to lower the heartbeat parameter, putting the db under pressure for no reason. With immediate=True you can force the check for new tasks: it will happen at most as heartbeat seconds are passed

A call to scheduler.queue_task returns the task id and uuid of the task you queued (can be the one you passed or the auto-generated one), and possible errors:

  1. <Row {'errors': {}, 'id': 1, 'uuid': '08e6433a-cf07-4cea-a4cb-01f16ae5f414'}>

If there are errors (usually syntax error or input validation errors), you get the result of the validation, and id and uuid will be None

  1. <Row {'errors': {'period': 'enter an integer greater than or equal to 0'}, 'id': None, 'uuid': None}>

task_status

To query the scheduler about tasks, use task_status

  1. scheduler.task_status(ref, output=False)

The argument ref can be

  • integer —> lookup will be done by scheduler_task.id
  • string —> lookup will be done by scheduler_task.uuid
  • query —> lookup as you wish (as in db.scheduler_task.task_name == ‘test1’)

output=True fetches the scheduler_run record

It returns a single Row object, for the most recent queued task matching the criteria.

scheduler_run record is fetched by a left join, so it can have all fields == None

Example: retrieving scheduler task status, results and tracebacks

Here the scheduler instance is mysched

  1. task = mysched.queue_task(f, ...)
  2. task_status = mysched.task_status(task.id, output=True)
  3. traceback = task_status.scheduler_run.traceback
  4. result = task_status.scheduler_run.run_result #or
  5. result = task_status.result

Results and output

The table “scheduler_run” stores the status of all running tasks. Each record references a task that has been picked up by a worker. One task can have multiple runs. For example, a task scheduled to repeat 10 times an hour will probably have 10 runs (unless one fails or they take longer than 1 hour). Beware that if the task has no return values, it is removed from the scheduler_run table as soon as it is finished.

Possible run statuses are:

  1. RUNNING, COMPLETED, FAILED, TIMEOUT

If the run is completed, no exceptions are thrown, and there is no task timeout, the run is marked as COMPLETED and the task is marked as QUEUED or COMPLETED depending on whether it is supposed to run again at a later time. The output of the task is serialized in JSON and stored in the run record.

When a RUNNING task throws an exception, the run is mark as FAILED and the task is marked as FAILED. The traceback is stored in the run record.

Similarly, when a run exceeds the timeout, it is stopped and marked as TIMEOUT, and the task is marked as TIMEOUT.

In any case, the stdout is captured and also logged into the run record.

Due to multiprocessing limitations, beware of using either huge return values or huge print statements on the queued functions. As the output is buffered, your task may fail just because the parent process hangs on reading values. Also, leave print statements to the minimum, and if in need use a proper logging library that doesn’t clutter stdout. As for huge return values, a better option can be to use a table where the function saves the result: you can return only the reference to the specific line of results without hindering the master process of the scheduler.

Using appadmin, one can check all RUNNING tasks, the output of COMPLETED tasks, the error of FAILED tasks, etc.

The scheduler also creates one more table called “scheduler_worker”, which stores the workers’ heartbeat and their status.

Managing processes

Worker fine management is hard. This module tries not to leave behind any platform (Mac, Win, Linux) .

When you start a worker, you may later want to:

  • kill it “no matter what it’s doing”
  • kill it only if it is not processing tasks
  • put it to sleep

Maybe you have yet some tasks queued, and you want to save some resources. You know you want them processed every hour, so, you’ll want to:

  • process all queued tasks and die automatically

All of these things are possible managing Scheduler parameters or the scheduler_worker table. To be more precise, for started workers you can change the status value of any worker to influence its behavior. As for tasks, workers can be in one of the following statuses: ACTIVE, DISABLED, TERMINATE or KILLED.

ACTIVE and DISABLED are “persistent”, while TERMINATE or KILL, as statuses name suggest, are more “commands” than real statuses. Hitting ctrl+c is equal to set a worker to KILL

workers statuses

There are a few commodity functions since version 2.4.1 (self-explanatory)

  1. scheduler.disable()
  2. scheduler.resume()
  3. scheduler.terminate()
  4. scheduler.kill()

each function take an optional parameter, that can be a string or a list, to manage workers based on their group_names. It defaults to the group_names defined in the scheduler istantiation.

An example is better than a thousand words: scheduler.terminate('high_prio') will TERMINATE all the workers that are processing the high_prio tasks, while scheduler.terminate(['high_prio', 'low_prio']) will terminate all high_prio and low_prio workers.

Watch out: if you have a worker processing high_prio and low_prio, scheduler.terminate('high_prio') will terminate the worker alltogether, even if you didn’t want to terminate low_prio too.

Everything that one can do via appadmin one can do programmatically by inserting and updating records in these tables.

Anyway, one should not update records relative to RUNNING tasks as this may create an un-expected behavior. The best practice is to queue tasks using the “queue_task” method.

For example:

  1. scheduler.queue_task(function_name='task_add',
  2. pargs=[],
  3. pvars={'a': 3, 'b': 4},
  4. repeats=10, # run 10 times
  5. period=3600, # every 1h
  6. timeout=120, # should take less than 120 seconds
  7. )

Notice that “scheduler_task” table fields “times_run”, “last_run_time” and “assigned_worker_name” are not provided at schedule time but are filled automatically by the workers.

You can also retrieve the output of completed tasks:

  1. completed_runs = db(db.scheduler_run.run_status='COMPLETED').select()

The scheduler is considered experimental because it needs more extensive testing and because the table structure may change as more features are added.

Reporting progress percentages

A special “word” encountered in the print statements of your functions clear all the previous output. That word is !clear!. This, coupled with the sync_output parameter, allows to report percentages.

Here is an example:

  1. def reporting_percentages():
  2. time.sleep(5)
  3. print '50%'
  4. time.sleep(5)
  5. print '!clear!100%'
  6. return 1

The function reporting_percentages sleeps for 5 seconds, outputs 50%. Then, it sleeps other 5 seconds and outputs 100%. Note that the output in the scheduler_run table is synced every 2 seconds and that the second print statement that contains !clear!100% gets the 50% output cleared and replaced by 100% only.

  1. scheduler.queue_task(reporting_percentages, sync_output=2)