web2py Scheduler
The mainstream web2py solution for running tasks in the background (and therefore away from the webserver process) is the built-in scheduler.
The stable API consists of these functions:
- disable()
- resume()
- terminate()
- kill()
- queue_task()
- task_status()
- stop_task()
The web2py scheduler works very much like the task queue described in the previous sub-section with some differences:
- It provides a standard mechanism for creating, scheduling, and monitoring tasks.
- There is not a single background process but a set of workers processes.
- The job of worker nodes can be monitored because their state, as well as the state of the tasks, is stored in the database.
- It works without web2py but that is not documented here.
The scheduler does not use cron, although one can use cron @reboot to start the worker nodes.
More information about deploying the scheduler under Linux and Windows is in Chapter 13.
In the scheduler, a task is simply a function defined in a model (or in a module and imported by a model). For example:
def task_add(a, b):
return a + b
Tasks will always be called in the same environment seen by controllers and therefore they see all the global variables defined in models, including database connections (db
). Tasks differ from a controller action because they are not associated with an HTTP request and therefore there is no request.env
. Also, tasks can access another environmental variable that is not present in normal requests: W2P_TASK
. W2P_TASK.id
holds the scheduler_task.id
and W2P_TASK.uuid
the scheduler_task.uuid
field of the task that is running.
Remember to call
db.commit()
at the end of every task if it involves inserts/updates to the database. web2py commits by default at the end of a successful action but the scheduler tasks are not actions.
To enable the scheduler you must instantiate the Scheduler class in a model. The recommended way to enable the scheduler to your app is to create a model file named scheduler.py
and define your function there. After the functions, you can put the following code into the model:
from gluon.scheduler import Scheduler
scheduler = Scheduler(db)
If your tasks are defined in a module (as opposed to a model) you may have to restart the workers.
The task is scheduled with
scheduler.queue_task(task_add, pvars=dict(a=1, b=2))
Parameters
The first argument of the Scheduler
class must be the database to be used by the scheduler to communicate with the workers. This can be the db
of the app or another dedicated db
, perhaps one shared by multiple apps. If you use SQLite it’s recommended to use a separate db from the one used by your app in order to keep the app responsive. Once the tasks are defined and the Scheduler
is instantiated, all that is needed to do is to start the workers. You can do that in several ways:
python web2py.py -K myapp
starts a worker for the app myapp
. If you want start multiple workers for the same app, you can do so just passing myapp,myapp
. You can pass also the group_names
(overriding the one set in your model) with
python web2py.py -K myapp:group1:group2,myotherapp:group1
If you have a model called scheduler.py
you can start/stop the workers from web2py’s default window (the one you use to set the ip address and the port).
Scheduler Deployment
One last nice addition: if you use the embedded webserver, you can start the webserver and the scheduler with just one line of code (this assumes you don’t want the web2py window popping up, else you can use the “Schedulers” menu instead)
python web2py.py -a yourpass -K myapp -X
You can pass the usual parameters (-i, -p, here -a prevents the window from showing up), pass whatever app in the -K parameter and append a -X. The scheduler will run alongside the webserver!
Windows users looking to create a service should see in Chapter 13.
Complete Scheduler signature
Scheduler’s complete signature is:
Scheduler(db,
tasks=None,
migrate=True,
worker_name=None,
group_names=None,
heartbeat=HEARTBEAT,
max_empty_runs=0,
discard_results=False,
utc_time=False)
Let’s see them in order:
db
is the database DAL instance where you want the scheduler tables be placed.tasks
is a dictionary that maps task names into functions. If you do not pass this parameter, function will be searched in the app environment.worker_name
is None by default. As soon as the worker is started, a worker name is generated as hostname-uuid. If you want to specify that, be sure that it’s unique.group_names
is by default set to [main]. All tasks have agroup_name
parameter, set to main by default. Workers can only pick up tasks of their assigned group.
NB: This is useful if you have different workers instances (e.g. on different machines) and you want to assign tasks to a specific worker.
NB2: It’s possible to assign a worker more groups, and they can be also all the same, as
['mygroup','mygroup']
. Tasks will be distributed taking into consideration that a worker with group_names['mygroup','mygroup']
is able to process the double of the tasks a worker with group_names['mygroup']
is.
heartbeat
is by default set to 3 seconds. This parameter is the one controlling how often a scheduler will check its status on thescheduler_worker
table and see if there are any ASSIGNED tasks to itself to process.max_empty_runs
is 0 by default, that means that the worker will continue to process tasks as soon as they are ASSIGNED. If you set this to a value of, let’s say, 10, a worker will die automatically if it’s ACTIVE and no tasks are ASSIGNED to it for 10 loops. A loop is when a worker searches for tasks, every 3 seconds (or the setheartbeat
)discard_results
is False by default. If set to True, no scheduler_run records will be created.
NB: scheduler_run records will be created as before for FAILED, TIMEOUT and STOPPED tasks’s statuses.
utc_time
is False by default. If you need to coordinate with workers living in different timezones, or do have problems with solar/DST times, supplying datetimes from different countries, etc, you can set this to True. The scheduler will honor the UTC time and work leaving the local time aside. Caveat: you need to schedule tasks with UTC times (for start_time, stop_time, and so on).
Now we have the infrastructure in place: defined the tasks, told the scheduler about them, started the worker(s). What remains is to actually schedule the tasks.
Tasks
Tasks can be scheduled programmatically or via appadmin. In fact, a task is scheduled simply by adding an entry in the table “scheduler_task”, which you can access via appadmin:
http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
The meaning of the fields in this table is obvious. The “args” and “vars”” fields are the values to be passed to the task in JSON format. In the case of the “task_add” above, an example of “args” and “vars” could be:
args = [3, 4]
vars = {}
or
args = []
vars = {'a': 3, 'b': 4}
The scheduler_task
table is the one where tasks are organized.
To add tasks via the API, use
scheduler.queue_task('mytask', ...)
as explained later in this chapter in queue_task.
Task Lifecycle
All tasks follow a lifecycle
By default, when you send a task to the scheduler, it is in the QUEUED status. If you need it to be executed later, use the start_time
parameter (default = now). If for some reason you need to be sure that the task does not get executed after a certain point in time (maybe a request to a web service that shuts down at 1AM, a mail that needs to be sent not after the working hours, etc…) you can set a stop_time
(default = None) for it. If your task is NOT picked up by a worker before stop_time
, it will be set as EXPIRED. Tasks with no stop_time
set or picked up BEFORE stop_time are ASSIGNED to a worker. When a workers picks up a task, its status is set to RUNNING.
RUNNING tasks may end up:
- TIMEOUT when more than
n
seconds passed withtimeout
parameter (default = 60 seconds). - FAILED when an exception is detected,
- COMPLETED when they successfully complete.
Values for start_time
and stop_time
should be datetime objects. To schedule “mytask” to run at 30 seconds from the current time, for example, you would do the following:
from datetime import timedelta as timed
scheduler.queue_task('mytask', start_time=request.now + timed(seconds=30))
Additionally, you can control how many times a task should be repeated (i.e. you need to aggregate some data at specified intervals). To do so, set the repeats
parameter (default = 1 time only, 0 = unlimited). You can influence how many seconds should pass between executions with the period
parameter (default = 60 seconds).
Default behavior: The time period is not calculated between the END of the first round and the START of the next, but from the START time of the first round to the START time of the next cycle). This can cause accumulating ‘drift’ in the start time of a job. After v 2.8.2, a new parameter
prevent_drift
was added, defaulting to False. If set to True when queing a task, the start_time parameter will take precedence over the period, preventing drift.
You can also set how many times the function can raise an exception (i.e. requesting data from a slow web service) and be queued again instead of stopping in FAILED status using the parameter retry_failed
(default = 0, -1 = unlimited).
Summary: you have
period
andrepeats
to get an automatically rescheduled functiontimeout
to be sure that a function doesn’t exceed a certain amount of timeretry_failed
to control how many times the task can “fail”start_time
andstop_time
to schedule a function in a restricted timeframe
queue_task
The method:
scheduler.queue_task(function,
pargs=[],
pvars={},
start_time=now, # datetime
stop_time=None, # datetime
timeout = 60, # seconds
prevent_drift=False,
period=60, # seconds
immediate=False,
repeats=1)
allows you to queue tasks to be executed by workers. It returns a row (see here), and it takes the following parameters:
function
(required): It can be a task name or a reference to an actual function.pargs
: are the arguments to be passed to the task, stored as a Python list.pvars
: are the named arguments to be passed to the task, stored as a Python dictionary.- all other scheduler_task columns can be passed as keyword arguments; the most important are shown.
For example:
scheduler.queue_task('demo1', [1, 2])
does the exact same thing as
scheduler.queue_task('demo1', pvars={'a': 1, 'b': 2})
as
st.validate_and_insert(function_name='demo1', args=json.dumps([1, 2]))
and as:
st.validate_and_insert(function_name='demo1', vars=json.dumps({'a': 1, 'b': 2}))
Here is a more complex complete example:
def task_add(a, b):
return a + b
scheduler = Scheduler(db, tasks=dict(demo1=task_add))
scheduler.queue_task('demo1', pvars=dict(a=1, b=2), repeats = 0, period=180)
Since version 2.4.1 if you pass an additional parameter immediate=True
it will force the main worker to reassign tasks. Until 2.4.1, the worker checks for new tasks every 5 cycles (so, 5*heartbeats
seconds). If you had an app that needed to check frequently for new tasks, to get a snappy behaviour you were forced to lower the heartbeat
parameter, putting the db under pressure for no reason. With immediate=True
you can force the check for new tasks: it will happen at most as heartbeat
seconds are passed
A call to scheduler.queue_task
returns the task id
and uuid
of the task you queued (can be the one you passed or the auto-generated one), and possible errors
:
<Row {'errors': {}, 'id': 1, 'uuid': '08e6433a-cf07-4cea-a4cb-01f16ae5f414'}>
If there are errors (usually syntax error or input validation errors), you get the result of the validation, and id and uuid will be None
<Row {'errors': {'period': 'enter an integer greater than or equal to 0'}, 'id': None, 'uuid': None}>
task_status
To query the scheduler about tasks, use task_status
scheduler.task_status(ref, output=False)
The argument ref
can be
- integer —> lookup will be done by scheduler_task.id
- string —> lookup will be done by scheduler_task.uuid
- query —> lookup as you wish (as in db.scheduler_task.task_name == ‘test1’)
output=True
fetches the scheduler_run record
It returns a single Row object, for the most recent queued task matching the criteria.
scheduler_run record is fetched by a left join, so it can have all fields == None
Example: retrieving scheduler task status, results and tracebacks
Here the scheduler instance is mysched
task = mysched.queue_task(f, ...)
task_status = mysched.task_status(task.id, output=True)
traceback = task_status.scheduler_run.traceback
result = task_status.scheduler_run.run_result #or
result = task_status.result
Results and output
The table “scheduler_run” stores the status of all running tasks. Each record references a task that has been picked up by a worker. One task can have multiple runs. For example, a task scheduled to repeat 10 times an hour will probably have 10 runs (unless one fails or they take longer than 1 hour). Beware that if the task has no return values, it is removed from the scheduler_run table as soon as it is finished.
Possible run statuses are:
RUNNING, COMPLETED, FAILED, TIMEOUT
If the run is completed, no exceptions are thrown, and there is no task timeout, the run is marked as COMPLETED
and the task is marked as QUEUED
or COMPLETED
depending on whether it is supposed to run again at a later time. The output of the task is serialized in JSON and stored in the run record.
When a RUNNING
task throws an exception, the run is mark as FAILED
and the task is marked as FAILED
. The traceback is stored in the run record.
Similarly, when a run exceeds the timeout, it is stopped and marked as TIMEOUT
, and the task is marked as TIMEOUT
.
In any case, the stdout is captured and also logged into the run record.
Due to multiprocessing limitations, beware of using either huge return values or huge print statements on the queued functions. As the output is buffered, your task may fail just because the parent process hangs on reading values. Also, leave print statements to the minimum, and if in need use a proper logging library that doesn’t clutter stdout. As for huge return values, a better option can be to use a table where the function saves the result: you can return only the reference to the specific line of results without hindering the master process of the scheduler.
Using appadmin, one can check all RUNNING
tasks, the output of COMPLETED
tasks, the error of FAILED
tasks, etc.
The scheduler also creates one more table called “scheduler_worker”, which stores the workers’ heartbeat and their status.
Managing processes
Worker fine management is hard. This module tries not to leave behind any platform (Mac, Win, Linux) .
When you start a worker, you may later want to:
- kill it “no matter what it’s doing”
- kill it only if it is not processing tasks
- put it to sleep
Maybe you have yet some tasks queued, and you want to save some resources. You know you want them processed every hour, so, you’ll want to:
- process all queued tasks and die automatically
All of these things are possible managing Scheduler
parameters or the scheduler_worker
table. To be more precise, for started workers you can change the status
value of any worker to influence its behavior. As for tasks, workers can be in one of the following statuses: ACTIVE, DISABLED, TERMINATE or KILLED.
ACTIVE and DISABLED are “persistent”, while TERMINATE or KILL, as statuses name suggest, are more “commands” than real statuses. Hitting ctrl+c is equal to set a worker to KILL
There are a few commodity functions since version 2.4.1 (self-explanatory)
scheduler.disable()
scheduler.resume()
scheduler.terminate()
scheduler.kill()
each function take an optional parameter, that can be a string or a list, to manage workers based on their group_names
. It defaults to the group_names
defined in the scheduler istantiation.
An example is better than a thousand words: scheduler.terminate('high_prio')
will TERMINATE all the workers that are processing the high_prio
tasks, while scheduler.terminate(['high_prio', 'low_prio'])
will terminate all high_prio
and low_prio
workers.
Watch out: if you have a worker processing
high_prio
andlow_prio
,scheduler.terminate('high_prio')
will terminate the worker alltogether, even if you didn’t want to terminatelow_prio
too.
Everything that one can do via appadmin one can do programmatically by inserting and updating records in these tables.
Anyway, one should not update records relative to RUNNING
tasks as this may create an un-expected behavior. The best practice is to queue tasks using the “queue_task” method.
For example:
scheduler.queue_task(function_name='task_add',
pargs=[],
pvars={'a': 3, 'b': 4},
repeats=10, # run 10 times
period=3600, # every 1h
timeout=120, # should take less than 120 seconds
)
Notice that “scheduler_task” table fields “times_run”, “last_run_time” and “assigned_worker_name” are not provided at schedule time but are filled automatically by the workers.
You can also retrieve the output of completed tasks:
completed_runs = db(db.scheduler_run.run_status='COMPLETED').select()
The scheduler is considered experimental because it needs more extensive testing and because the table structure may change as more features are added.
Reporting progress percentages
A special “word” encountered in the print statements of your functions clear all the previous output. That word is !clear!
. This, coupled with the sync_output
parameter, allows to report percentages.
Here is an example:
def reporting_percentages():
time.sleep(5)
print '50%'
time.sleep(5)
print '!clear!100%'
return 1
The function reporting_percentages
sleeps for 5 seconds, outputs 50%
. Then, it sleeps other 5 seconds and outputs 100%
. Note that the output in the scheduler_run table is synced every 2 seconds and that the second print statement that contains !clear!100%
gets the 50%
output cleared and replaced by 100%
only.
scheduler.queue_task(reporting_percentages, sync_output=2)