uWSGI API - Python decorators

The uWSGI API is very low-level, as it must be language-independent.

That said, being too low-level is not a Good Thing for many languages, such as Python.

Decorators are, in our humble opinion, one of the more kick-ass features of Python, so in the uWSGI source tree you will find a module exporting a bunch of decorators that cover a good part of the uWSGI API.

Notes

Signal-based decorators execute the signal handler in the first available worker.If you have enabled the spooler you can execute the signal handlers in it, leaving workers free to manage normal requests. Simply pass target='spooler' to the decorator.

  1. @timer(3, target='spooler')def hello(signum): print("hello")

Example: a Django session cleaner and video encoder

Let’s define a task.py module and put it in the Django project directory.

  1. from uwsgidecorators import *
  2. from django.contrib.sessions.models import Session
  3. import os
  4.  
  5. @cron(40, 2, -1, -1, -1)
  6. def clear_django_session(num):
  7. print("it's 2:40 in the morning: clearing django sessions")
  8. Session.objects.all().delete()
  9.  
  10. @spool
  11. def encode_video(arguments):
  12. os.system("ffmpeg -i \"%s\" image%%d.jpg" % arguments['filename'])

The session cleaner will be executed every day at 2:40, to enqueue a video encoding we simply need to spool it from somewhere else.

  1. from task import encode_video
  2.  
  3. def index(request):
  4. # launching video encoding
  5. encode_video.spool(filename=request.POST['video_filename'])
  6. return render_to_response('enqueued.html')

Now run uWSGI with the spooler enabled:

  1. [uwsgi]
  2. ; a couple of placeholder
  3. django_projects_dir = /var/www/apps
  4. my_project = foobar
  5. ; chdir to app project dir and set pythonpath
  6. chdir = %(django_projects_dir)/%(my_project)
  7. pythonpath = %(django_projects_dir)
  8. ; load django
  9. module = django.core.handlers:WSGIHandler()
  10. env = DJANGO_SETTINGS_MODULE=%(my_project).settings
  11. ; enable master
  12. master = true
  13. ; 4 processes should be enough
  14. processes = 4
  15. ; enable the spooler (the mytasks dir must exist!)
  16. spooler = %(chdir)/mytasks
  17. ; load the task.py module
  18. import = task
  19. ; bind on a tcp socket
  20. socket = 127.0.0.1:3031

The only especially relevant option is the import one. It works in the same way as module but skips the WSGI callable search.You can use it to preload modules before the loading of WSGI apps. You can specify an unlimited number of ‘’‘import’‘’ directives.

Example: web2py + spooler + timer

First of all define your spooler and timer functions (we will call it :file:mytasks.py)

  1. from uwsgidecorators import *
  2.  
  3. @spool
  4. def a_long_task(args):
  5. print(args)
  6.  
  7. @spool
  8. def a_longer_task(args)
  9. print("longer.....")
  10.  
  11. @timer(3)
  12. def three_seconds(signum):
  13. print("3 seconds elapsed")
  14.  
  15. @timer(10, target='spooler')
  16. def ten_seconds_in_the_spooler(signum):
  17. print("10 seconds elapsed in the spooler")

Now run web2py.

  1. uwsgi --socket :3031 --spooler myspool --master --processes 4 --import mytasks --module web2py.wsgihandler

As soon as the application is loaded, you will see the 2 timers running in your logs.

Now we want to enqueue tasks from our web2py controllers.

Edit one of them and add

  1. import mytasks # be sure mytasks is importable!
  2.  
  3. def index(): # this is a web2py action
  4. mytasks.a_long_task.spool(foo='bar')
  5. return "Task enqueued"

uwsgidecorators API reference

  • uwsgidecorators.postfork(func)
  • uWSGI is a preforking (or “fork-abusing”) server, so you might need to execute a fixup task after each fork(). The postfork decorator is just the ticket.You can declare multiple postfork tasks. Each decorated function will be executed in sequence after each fork().

  1. @postforkdef reconnect_to_db(): myfoodb.connect()

  2. @postforkdef hello_world(): print("Hello World")

  • uwsgidecorators.spool(func, pass_arguments=False)
  • The uWSGI spooler can be very useful. Compared toCelery or other queues it is very “raw”. The spool decorator willhelp!

  1. @spooldef a_long_long_task(arguments): print(arguments) for i in xrange(0, 10000000): time.sleep(0.1)

  2. @spooldef a_longer_task(args): print(args) for i in xrange(0, 10000000): time.sleep(0.5)

  3. enqueue the tasks

    a_long_long_task.spool(foo='bar', hello='world')a_longer_task.spool({'pippo':'pluto'})

Warning

On Python3, only bytes type arguments are allowed. See below foranother way of passing arguments.

When pass_arguments is set to True, arguments can be of any typesupported by the pickle module. The following arguments havea special meaning:

  • spooler: specify the absolute path of the spooler that has tomanage this task
  • at: unix time at which the task must be executed (read: the taskwill not be run until the at time is passed)
  • priority: this will be the subdirectory in the spooler directoryin which the task will be placed, you can use that trick to givea good-enough prioritization to tasks (for better approach usemultiple spoolers)

Warning

On Python3, the special arguments spooler, at,priority and body must be bytes.

  1. @spool(pass_arguments=True)def some_task(args, *kwargs): print(args) print(kwargs) for i in xrange(0, 10000000): time.sleep(0.5)

  2. enqueue the task

    some_task.spool(id=42, foo=['bar', 'baz'], func=min, at=str(1497023151))

The functions will automatically return uwsgi.SPOOL_OK so they willbe executed one time independently by their return status.

  • uwsgidecorators.spoolforever(func, pass_arguments=False)
  • Use spoolforever when you want to continuously execute a spool task.A @spoolforever task will always return uwsgi.SPOOL_RETRY.

  1. @spoolforeverdef a_longer_task(args): print(args) for i in xrange(0, 10000000): time.sleep(0.5)

  2. enqueue the task

    a_longer_task.spool({'pippo':'pluto'})

Warning

On Python3, only bytes type arguments are allowed. See below foranother way of passing arguments.

When pass_arguments is set to True, arguments can be of any typesupported by the pickle module. The following arguments havea special meaning:

  • spooler: specify the absolute path of the spooler that has tomanage this task
  • at: unix time at which the task must be executed (read: the taskwill not be run until the at time is passed)
  • priority: this will be the subdirectory in the spooler directoryin which the task will be placed, you can use that trick to givea good-enough prioritization to tasks (for better approach usemultiple spoolers)

Warning

On Python3, the special arguments spooler, at,priority and body must be bytes.

  1. @spoolforever(pass_arguments=True)def a_longer_task(*args): print(args) for i in xrange(0, 10000000): time.sleep(0.5)

  2. enqueue the task

    a_longer_task.spool('pluto', 42)

  • uwsgidecorators.spoolraw(func, pass_arguments=False)
  • Advanced users may want to control the return value of a task.
  1. @spoolrawdef a_controlled_task(args): if args['foo'] == 'bar': return uwsgi.SPOOL_OK return uwsgi.SPOOL_RETRYa_controlled_task.spool(foo='bar')

Warning

On Python3, only bytes type arguments are allowed. See below foranother way of passing arguments.

When pass_arguments is set to True, arguments can be of any typesupported by the pickle module. The following arguments havea special meaning:

  • spooler: specify the absolute path of the spooler that has tomanage this task
  • at: unix time at which the task must be executed (read: the taskwill not be run until the at time is passed)
  • priority: this will be the subdirectory in the spooler directoryin which the task will be placed, you can use that trick to givea good-enough prioritization to tasks (for better approach usemultiple spoolers)

Warning

On Python3, the special arguments spooler, at,priority and body must be bytes.

  1. @spoolraw(pass_arguments=True)def a_controlled_task(**kwargs): if kwargs['foo'] == 'bar': return uwsgi.SPOOL_OK return uwsgi.SPOOL_RETRYa_controlled_task.spool(foo='bar', age=42)
  • uwsgidecorators.rpc("name", func)
  • uWSGI uWSGI RPC Stack is the fastest way to remotely call functions in applications hosted in uWSGI instances. You can easily define exported functions with the @rpc decorator.

  1. @rpc('helloworld')def ciao_mondo_function(): return "Hello World"

  • uwsgidecorators.signal(num)(func)
  • You can register signals for the signal framework in one shot.

  1. @signal(17)def my_signal(num): print("i am signal %d" % num)

  • uwsgidecorators.timer(interval, func)
  • Execute a function at regular intervals.

  1. @timer(3)def three_seconds(num): print("3 seconds elapsed")

  • uwsgidecorators.rbtimer(interval, func)
  • Works like @timer but using red black timers.
  • uwsgidecorators.cron(min, hour, day, mon, wday, func)
  • Easily register functions for the CronInterface.

  1. @cron(59, 3, -1, -1, -1)def execute_me_at_three_and_fiftynine(num): print("it's 3:59 in the morning")

Since 1.2, a new syntax is supported to simulate crontab-like intervals (every Nth minute, etc.). /5 * can be specified in uWSGI like thus:

  1. @cron(-5, -1, -1, -1, -1)def execute_me_every_five_min(num): print("5 minutes, what a long time!")

  • uwsgidecorators.filemon(path, func)
  • Execute a function every time a file/directory is modified.

  1. @filemon("/tmp")def tmp_has_been_modified(num): print("/tmp directory has been modified. Great magic is afoot")

  • uwsgidecorators.erlang(process_name, func)
  • Map a function as an Erlang process.

  1. @erlang('foobar')def hello(): return "Hello"

  • uwsgidecorators.thread(func)
  • Mark function to be executed in a separate thread.

Important

Threading must be enabled in uWSGI with the enable-threads or threads <n> option.

  1. @threaddef a_running_thread(): while True: time.sleep(2) print("i am a no-args thread")

  2. @threaddef a_running_thread_with_args(who): while True: time.sleep(2) print("Hello %s (from arged-thread)" % who)

  3. a_running_thread()a_running_thread_with_args("uWSGI")

You may also combine @thread with @postfork to spawn the postfork handler in a new thread in the freshly spawned worker.

  1. @postfork@threaddef a_post_fork_thread(): while True: time.sleep(3) print("Hello from a thread in worker %d" % uwsgi.worker_id())

  • uwsgidecorators.lock(func)
  • This decorator will execute a function in fully locked environment, making it impossible for other workers or threads (or the master, if you’re foolish or brave enough) to run it simultaneously.Obviously this may be combined with @postfork.

  1. @lockdef dangerous_op(): print("Concurrency is for fools!")

  • uwsgidecorators.mulefunc([mulespec, ]func)
  • Offload the execution of the function to a mule. When the offloaded function is called, it will return immediately and execution is delegated to a mule.

  1. @mulefuncdef i_am_an_offloaded_function(argument1, argument2): print argument1,argument2

You may also specify a mule ID or mule farm to run the function on. Please remember to register your function with a uwsgi import configuration option.

  1. @mulefunc(3)def on_three(): print "I'm running on mule 3."

  2. @mulefunc('old_mcdonalds_farm')def on_mcd(): print "I'm running on a mule on Old McDonalds' farm."

  • uwsgidecorators.harakiri(time, func)
  • Starting from uWSGI 1.3-dev, a customizable secondary harakiri subsystem has been added. You can use this decorator to kill a worker if the given call is taking too long.

  1. @harakiri(10)def slow_function(foo, bar): for i in range(0, 10000): for y in range(0, 10000): pass

  2. or the alternative lower level api

    uwsgi.set_user_harakiri(30) # you have 30 seconds. fight!slow_function()uwsgi.set_user_harakiri(0) # clear the timer, all is well