Offloading Websockets and Server-Sent Events AKA “Combine them with Django safely”

Author: Roberto De Ioris

Date: 20140315

Disclaimer

This article shows a pretty advanced way for combining websockets (or sse) apps with Django in a “safe way”. It will not show youhow cool websockets and sse are, or how to write better apps with them, it is an attempt to try to avoid bad practices with them.

In my opinion the Python web-oriented world is facing a communication/marketing problem: There is a huge number of peoplerunning heavily blocking apps (like Django) on non-blocking technologies (like gevent) only because someone told them it is cool and will solve all of their scaling issues.

This is completely WRONG, DANGEROUS and EVIL, you cannot mix blocking apps with non-blocking engines, even a single, ultra-tiny blocking partcan potentially destroy your whole stack. As I have already said dozens of time, if your app is 99.9999999% non-blocking, it is still blocking.

And no, monkey-patching on your Django app is not magic. Unless you are using pretty-customized database adapters, tuned for working in a non-blocking way, you are doing it wrong.

At the cost of looking a huber-asshole, I strongly suggest you completely ignore people suggesting you move your Django app to gevent, eventlet, tornado or whatever, without warning you aboutthe hundreds of problems you may encounter.

Having said that, I love gevent, it is probably the best (with perl’s Coro::AnyEvent) supported loop engine in the uWSGI project. So in this article I will use gevent for managing websocket/sse traffic and plain multiprocessing for the Django part.

If this last sentence looks like nonsense to you, you probably do not know what uWSGI offloading is…

uWSGI offloading

The concept is not a new thing, or a uWSGI specific one. Projects like nodejs or twisted have used it for ages.

Note

an example of a webapp serving a static file is not very interesting, nor the best thing to show, but will be useful later, when presenting a real-world scenario with X-Sendfile

Immagine this simple WSGI app:

  1. def application(env, start_response):
  2. start_response('200 OK',[('Content-Type','text/plain')])
  3. f = open('/etc/services')
  4. # do not do it, if the file is 4GB it will allocate 4GB of memory !!!
  5. yield f.read()

This will simply return the content of /etc/services. It is a pretty tiny file, so in few milliseconds your process will be ready to process another request.

What if /etc/services is 4 gigabytes? Your process (or thread) will be blocked for several seconds (even minutes), and will not be able to manage another requestuntil the file is completely transferred.

Wouldn’t it be cool if you could tell another thread to send the file for you, so you will be able to manage another request?

Offloading is exactly this: it will give you one ore more threads for doing simple and slow task for you. Which kind of tasks? All of those that can be managedin a non-blocking way, so a single thread can manage thousand of transfers for you.

You can see it as the DMA engine in your computer, your CPU will program the DMA to transfer memory from a controller to the RAM, and then will be freed to accomplish another task while the DMA works in background.

To enable offloading in uWSGI you only need to add the —offload-threads <n> option, where <n> is the number of threads per-process to spawn. (generally a single thread will be more than enough, but if you want to use/abuse your multiple cpu cores feel free to increase it)

Once offloading is enabled, uWSGI will automatically use it whenever it detects that an operation can be offloaded safely.

In the python/WSGI case any use of wsgi.file_wrapper will be offloaded automatically, as well as when you use the uWSGI proxy features for passing requests to other server speaking the uwsgi or HTTP protocol.

A cool example (showed even in the Snippets page of uWSGI docs) is implementing an offload-powered X-Sendfile feature:

  1. [uwsgi]
  2. ; load router_static plugin (compiled in by default in monolithic profiles)
  3. plugins = router_static
  4.  
  5. ; spawn 2 offload threads
  6. offload-threads = 2
  7.  
  8. ; files under /etc can be safely served (DANGEROUS !!!)
  9. static-safe = /etc
  10.  
  11. ; collect the X-Sendfile response header as X_SENDFILE var
  12. collect-header = X-Sendfile X_SENDFILE
  13.  
  14. ; if X_SENDFILE is not empty, pass its value to the "static" routing action (it will automatically use offloading if available)
  15. response-route-if-not = empty:${X_SENDFILE} static:${X_SENDFILE}
  16.  
  17. ; now the classic options
  18. plugins = python
  19. ; bind to HTTP port 8080
  20. http-socket = :8080
  21. ; load a simple wsgi-app
  22. wsgi-file = myapp.py

Now in our app we can X-Sendfile to send static files without blocking:

  1. def application(env, start_response):
  2. start_response('200 OK',[('X-Sendfile','/etc/services')])
  3. return []

A very similar concept will be used in this article: We will use a normal Django to setup our session, to authorize the user and whatever (that is fast) you want, then we will return a special header that will instruct uWSGI to offload the connection to another uWSGI instance (listening on a private socket) that will manage the websocket/sse transaction using gevent in a non-blocking way.

Our SSE app

The SSE part will be very simple, a gevent-based WSGI app will send the current time every second:

  1. from sse import Sse
  2. import time
  3.  
  4. def application(e, start_response):
  5. print e
  6. # create the SSE session
  7. session = Sse()
  8. # prepare HTTP headers
  9. headers = []
  10. headers.append(('Content-Type','text/event-stream'))
  11. headers.append(('Cache-Control','no-cache'))
  12. start_response('200 OK', headers)
  13. # enter the loop
  14. while True:
  15. # monkey patching will prevent sleep() blocking
  16. time.sleep(1)
  17. # add the message
  18. session.add_message('message', str(time.time()))
  19. # send to the client
  20. yield str(session)

Let’s run it on /tmp/foo UNIX socket (save the app as sseapp.py)

  1. uwsgi --wsgi-file sseapp.py --socket /tmp/foo --gevent 1000 --gevent-monkey-patch

(monkey patching is required for time.sleep(), feel free to use gevent primitives for sleeping if you want/prefer)

The (boring) HTML/Javascript

  1. <html>
  2. <head>
  3. </head>
  4. <body>
  5. <h1>Server sent events</h1>
  6. <div id="event"></div>
  7. <script type="text/javascript">
  8.  
  9. var eventOutputContainer = document.getElementById("event");
  10. var evtSrc = new EventSource("/subscribe");
  11.  
  12. evtSrc.onmessage = function(e) {
  13. console.log(e.data);
  14. eventOutputContainer.innerHTML = e.data;
  15. };
  16.  
  17. </script>
  18. </body>
  19. </html>

It is very simple, it will connect to /subscribe and will start waiting for events.

The Django view

Our django view, will be very simple, it will simply generate a special response header (we will call it X-Offload-to-SSE) with the username of the logged user as its value:

  1. def subscribe(request):
  2. response = HttpResponse()
  3. response['X-Offload-to-SSE'] = request.user
  4. return response

Now we are ready for the “advanced” part.

Let’s offload the SSE transaction

The configuration could look a bit complex but it is the same concept of the X-Sendfile seen before:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect X-Offload-to-SSE header and store in var X_OFFLOAD
  8. collect-header = X-Offload-to-SSE X_OFFLOAD
  9. ; if X_OFFLOAD is defined, do not send the headers generated by Django
  10. response-route-if-not = empty:${X_OFFLOAD} disableheaders:
  11. ; if X_OFFLOAD is defined, offload the request to the app running on /tmp/foo
  12. response-route-if-not = empty:${X_OFFLOAD} uwsgi:/tmp/foo,0,0

The only “new’ part is the use of disableheaders routing action. It is required otherwise the headers generated by Djangowill be sent along the ones generated by the gevent-based app.

You could avoid it (remember that disableheaders has been added only in 2.0.3) removing the call to start_response() in the gevent app (at the risk of being cursed by some WSGI-god) and changing the Django viewto set the right headers:

  1. def subscribe(request):
  2. response = HttpResponse()
  3. response['Content-Type'] = 'text/event-stream'
  4. response['X-Offload-to-SSE'] = request.user
  5. return response

Eventually you may want to be more “streamlined” and simply detect for ‘text/event-stream’ content_type presence:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9. ; if CONTENT_TYPE is 'text/event-stream', forward the request
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0

Now, how to access the username of the Django-logged user in the gevent app?

You should have noted that the gevent-app prints the content of the WSGI environ on each request. That environment is the sameof the Django app + the collected headers. So accessing environ[‘X_OFFLOAD’] will return the logged username. (obviously in the second example, where the content type is used, the variable with the username is no longer collected, so you should fix it)

You can pass all of the information you need using the same approach, you can collect all of the vars you need and so on.

You can even add variables at runtime:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9.  
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:FOO=BAR
  11. response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:TEST1=TEST2
  12.  
  13. ; if CONTENT_TYPE is 'text/event-stream', forward the request
  14. response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0

Or (using goto for better readability):

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9.  
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream goto:offload
  11. response-route-run = last:
  12.  
  13. response-route-label = offload
  14. response-route-run = addvar:FOO=BAR
  15. response-route-run = addvar:TEST1=TEST2
  16. response-route-run = uwsgi:/tmp/foo,0,0

Simplifying things using the uwsgi api (>= uWSGI 2.0.3)

While dealing with headers is pretty HTTP friendly, uWSGI 2.0.3 added the possibility to define per-request variablesdirectly in your code.

This allows a more “elegant” approach (even if highly non-portable):

  1. import uwsgi
  2.  
  3. def subscribe(request):
  4. uwsgi.add_var("LOGGED_IN_USER", request.user)
  5. uwsgi.add_var("USER_IS_UGLY", "probably")
  6. uwsgi.add_var("OFFLOAD_TO_SSE", "y")
  7. uwsgi.add_var("OFFLOAD_SERVER", "/tmp/foo")
  8. return HttpResponse()

Now the config can change to something more gentle:

  1. ; the boring part
  2. http-socket = :9090
  3. offload-threads = 2
  4. wsgi-file = sseproject/wsgi.py
  5.  
  6. ; if OFFLOAD_TO_SSE is 'y', do not send the headers generated by Django
  7. response-route-if = equal:${OFFLOAD_TO_SSE};y disableheaders:
  8. ; if OFFLOAD_TO_SSE is 'y', offload the request to the app running on 'OFFLOAD_SERVER'
  9. response-route-if = equal:${OFFLOAD_TO_SSE};y uwsgi:${OFFLOAD_SERVER},0,0

Have you noted how we allowed the Django app to set the backend server to use using a request variable?

Now we can go even further. We will not use the routing framework (except for disabling headers generation):

  1. import uwsgi
  2.  
  3. def subscribe(request):
  4. uwsgi.add_var("LOGGED_IN_USER", request.user)
  5. uwsgi.add_var("USER_IS_UGLY", "probably")
  6. uwsgi.route("uwsgi", "/tmp/foo,0,0")
  7. return HttpResponse()

and a simple:

  1. ; the boring part
  2. http-socket = :9090
  3. offload-threads = 2
  4. wsgi-file = sseproject/wsgi.py
  5.  
  6. response-route = ^/subscribe disableheaders:

What about Websockets ?

We have seen how to offload SSE (that are mono-directional). We can offload websockets too (that are bidirectional).

The concept is the same, you only need to ensure (as before) that no headers are sent by django, (otherwise the websocket handshake will fail) and then youcan change your gevent app:

  1. import time
  2. import uwsgi
  3.  
  4. def application(e, start_response):
  5. print e
  6. uwsgi.websocket_handshake()
  7. # enter the loop
  8. while True:
  9. # monkey patching will prevent sleep() to block
  10. time.sleep(1)
  11. # send to the client
  12. uwsgi.websocket_send(str(time.time()))

Using redis or uWSGI caching framework

Request vars are handy (and funny), but they are limited (see below). If you need to pass a big amount of data between Django and the sse/websocket app, Redisis a great way (and works perfectly with gevent). Basically you store infos from django to redis and than you pass only the hash key (via request vars) to the sse/websocket app.

The same can be accomplished with the uWSGI caching framework, but take into account redis has a lot of data primitives, while uWSGI only supports key->value items.

Common pitfalls

  • The amount of variables you can add per-request is limited by the uwsgi packet buffer (default 4k). You can increase it up to 64k with the –buffer-size option.
  • This is the whole point of this article: do not use the Django ORM in your gevent apps unless you know what you are doing!!! (read: you have a django database adapter that supports gevent and does not suck compared to the standard ones…)
  • Forget about finding a way to disable headers generation in django. This is a “limit/feature” of its WSGI adapter, use the uWSGI facilities (if available) or do not generate headers in your gevent app. Eventually you can modify wsgi.py in this way:
  1. """
  2. WSGI config for sseproject project.
  3.  
  4. It exposes the WSGI callable as a module-level variable named ``application``.
  5.  
  6. For more information on this file, see
  7. https://docs.djangoproject.com/en/1.6/howto/deployment/wsgi/
  8. """
  9.  
  10. import os
  11. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sseproject.settings")
  12.  
  13. from django.core.wsgi import get_wsgi_application
  14. django_application = get_wsgi_application()
  15.  
  16. def fake_start_response(status, headers, exc_info=None):
  17. pass
  18.  
  19. def application(environ, start_response):
  20. if environ['PATH_INFO'] == '/subscribe':
  21. return django_application(environ, fake_start_response)
  22. return django_application(environ, start_response)