Offloading Websockets and Server-Sent Events AKA “Combine them with Django safely”

作者:Roberto De Ioris

日期:20140315

免责声明

这篇文章显示了将websockets(或者sse)与Django以一种“安全的方式”结合起来的一种相当高级的方法。它不会向你展示websockets和sse有多酷,或者如何用它们写出更好的应用,而是试图让你避免使用它们的最差实践。

在我看来,Python面向web的世界正面临着一种通信/市场问题:大量的人在非阻塞技术(例如gevent)上运行大量阻塞应用(例如django)只是因为有人告诉他们这很酷并且会解决他们所有的扩展问题。

这是完全错误,危险和邪恶的,你不能将阻塞应用和非阻塞引擎混在一起,即使是一个单一、非常小的阻塞部分都能潜在摧毁你整个栈。正如我已经说了几十次一样,如果你的应用是99.9999999%非阻塞的,那么它仍然是阻塞的。

并且不是,你的Django应用上的猴子补丁并非魔法。除非你正使用高度自定义的数据库适配器,对工作在非阻塞模式进行了调整,否则你就是错的。

以看起来就是个超级大混蛋的代价,我强烈建议你完全忽略人们让你将你的Django应用移到gevent, eventlet, tornado或者其他什么的,而不警告你你会遇到数百个问题的建议。

话虽如此,我爱gevent,它可能是uWSGI项目中支持得最好的(带perl的Coro::AnyEvent)循环引擎了。因此,在这篇文章中,我将使用gevent来为Django部分管理websocket/sse流量和纯多进程。

如果这最后一句对你来说像是废话,那么你可能不知道uWSGI的卸载是什么……

uWSGI卸载

这个概念并非是个新东西,或者是uWSGI特有的。诸如nodejs或者twisted这样的项目已经用它多年了。

注解

一个提供静态文件服务的web应用的例子并不非常有趣,也不是用来展示的最好的东西,但是在稍后展示一个使用X-Sendfile的真实世界的例子的时候,会游泳。

想象这个简单的WSGI应用:

  1. def application(env, start_response):
  2. start_response('200 OK',[('Content-Type','text/plain')])
  3. f = open('/etc/services')
  4. # do not do it, if the file is 4GB it will allocate 4GB of memory !!!
  5. yield f.read()

这将会简单返回/etc/services的内容。它是一个相当小的文件,因此在几毫秒时间内,你的进程将会准备好处理另一个请求。

那要是/etc/services是4千兆字节呢?你的进程(或者线程)将会阻塞几秒(甚至几分钟),并且不能够管理其他请求,直到完全传输这个文件。

如果你可以告诉其他线程为你发送这个文件,这样你就能够管理其他请求,岂不是很酷?

卸载就是这样的:它会为你提供一个或多个线程来为你完成一些简单并且慢点任务。哪种任务呢?所有那些可以以一种非阻塞方式管理的任务,因此单个线程就可以为你管理上千个传输。

你可以将其视为你的电脑中的DMA引擎,你的CPU将会编程DMA来将内存从控制器传输到RAM,然后将会被释放,以完成其他任务,同时DMA在后台工作。

要在uWSGI中启用卸载,你只需要添加 —offload-threads <n> 选项,其中<n>是每个进程要生成的线程数。 (一般而言,单个线程就够了,但是如果你想要使用/滥用你的多CPU核,那么随意增加)

一旦启用了卸载,那么每当uWSGI检测到一个操作能够安全被卸载的时候,它将自动使用它。

在python/WSGI中,任何对wsgi.file_wrapper的使用将会被自动卸载,以及当你使用uWSGI代理特性来传递请求到其他使用uwsgi或者HTTP协议到服务器时。

一个很酷的例子 (甚至在uWSGI文档的Snippets页面也有展示) 是实现一个卸载助力的X-Sendfile特性:

  1. [uwsgi]
  2. ; load router_static plugin (compiled in by default in monolithic profiles)
  3. plugins = router_static
  4.  
  5. ; spawn 2 offload threads
  6. offload-threads = 2
  7.  
  8. ; files under /etc can be safely served (DANGEROUS !!!)
  9. static-safe = /etc
  10.  
  11. ; collect the X-Sendfile response header as X_SENDFILE var
  12. collect-header = X-Sendfile X_SENDFILE
  13.  
  14. ; if X_SENDFILE is not empty, pass its value to the "static" routing action (it will automatically use offloading if available)
  15. response-route-if-not = empty:${X_SENDFILE} static:${X_SENDFILE}
  16.  
  17. ; now the classic options
  18. plugins = python
  19. ; bind to HTTP port 8080
  20. http-socket = :8080
  21. ; load a simple wsgi-app
  22. wsgi-file = myapp.py

现在,在我们的应用中,我们可以X-Sendfile来在无阻塞情况下发送静态文件:

  1. def application(env, start_response):
  2. start_response('200 OK',[('X-Sendfile','/etc/services')])
  3. return []

在这篇文章中将会使用一个非常类似的概念:我们将会使用一个正常的Django来设置我们的会话,来认证用户,以及任意(快的)东东,然后我们会返回一个特别的头,它会指示uWSGI卸载连接到另一个uWSGI实例 (监听一个私有socket),这个实例将使用gevent,以一种非阻塞方式管理websocket/sse事务。

我们的SSE应用

SSE部分将非常简单,一个基于gevent的WSGI应用将会每秒发送当前时间:

  1. from sse import Sse
  2. import time
  3.  
  4. def application(e, start_response):
  5. print e
  6. # create the SSE session
  7. session = Sse()
  8. # prepare HTTP headers
  9. headers = []
  10. headers.append(('Content-Type','text/event-stream'))
  11. headers.append(('Cache-Control','no-cache'))
  12. start_response('200 OK', headers)
  13. # enter the loop
  14. while True:
  15. # monkey patching will prevent sleep() blocking
  16. time.sleep(1)
  17. # add the message
  18. session.add_message('message', str(time.time()))
  19. # send to the client
  20. yield str(session)

让我们在/tmp/foo UNIX socket上运行它 (将应用保存为sseapp.py)

  1. uwsgi --wsgi-file sseapp.py --socket /tmp/foo --gevent 1000 --gevent-monkey-patch

(time.sleep()需要猴子补丁,如果你想要/喜欢的话,随意使用gevent原语来休眠)

(无趣的)HTML/Javascript

  1. <html>
  2. <head>
  3. </head>
  4. <body>
  5. <h1>Server sent events</h1>
  6. <div id="event"></div>
  7. <script type="text/javascript">
  8.  
  9. var eventOutputContainer = document.getElementById("event");
  10. var evtSrc = new EventSource("/subscribe");
  11.  
  12. evtSrc.onmessage = function(e) {
  13. console.log(e.data);
  14. eventOutputContainer.innerHTML = e.data;
  15. };
  16.  
  17. </script>
  18. </body>
  19. </html>

它非常简单,它将连接到/subscribe,并且将开始等待事件。

Django视图

我们的django视图,将非常简单,它将简单生成一个特色的响应头 (我们讲称之为X-Offload-to-SSE),并且把登录用户的用户名作为它的值:

  1. def subscribe(request):
  2. response = HttpResponse()
  3. response['X-Offload-to-SSE'] = request.user
  4. return response

现在,我们已经为“高级”部分准备好了。

让我们卸载SSE事务

配置看起来会有点复杂,但是它与之前看到的X-Sendfile概念相同:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect X-Offload-to-SSE header and store in var X_OFFLOAD
  8. collect-header = X-Offload-to-SSE X_OFFLOAD
  9. ; if X_OFFLOAD is defined, do not send the headers generated by Django
  10. response-route-if-not = empty:${X_OFFLOAD} disableheaders:
  11. ; if X_OFFLOAD is defined, offload the request to the app running on /tmp/foo
  12. response-route-if-not = empty:${X_OFFLOAD} uwsgi:/tmp/foo,0,0

唯一“新的”部分是使用 disableheaders 路由动作。这是必须的,否则Django生成的头将会伴着由基于gevent的应用生成的头发送。

你可以避免它 (记住,只在2.0.3添加了 disableheaders ),在gevent应用中移除到start_response()到调用 (冒着被一些WSGI神诅咒的风险),然后修改Django视图来设置正确的头部:

  1. def subscribe(request):
  2. response = HttpResponse()
  3. response['Content-Type'] = 'text/event-stream'
  4. response['X-Offload-to-SSE'] = request.user
  5. return response

最终,你或许想要更加“精简”,并简单检测’text/event-stream’ content_type存在:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9. ; if CONTENT_TYPE is 'text/event-stream', forward the request
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0

现在,如何在gevent应用中访问Django登录用户的用户名呢?

你应该注意到,gevent应用在每个请求中打印了WSGI环境变量的内容。那个环境变量与Django应用+已收集头部相同。因此,访问environ[‘X_OFFLOAD’]将会返回已登录用户名。 (显然,在第二个例子中,使用了内容类型,不再收集带用户名的变量,因此你应该修复它)

你可以使用相同的方法传递所有你需要的信息,你可以收集所有你需要的变量,等等等等。

你甚至可以在运行时添加变量:

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9.  
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:FOO=BAR
  11. response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:TEST1=TEST2
  12.  
  13. ; if CONTENT_TYPE is 'text/event-stream', forward the request
  14. response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0

或者 (使用goto以获得更好的可读性):

  1. [uwsgi]
  2. ; the boring part
  3. http-socket = :9090
  4. offload-threads = 2
  5. wsgi-file = sseproject/wsgi.py
  6.  
  7. ; collect Content-Type header and store in var CONTENT_TYPE
  8. collect-header = Content-Type CONTENT_TYPE
  9.  
  10. response-route-if = equal:${CONTENT_TYPE};text/event-stream goto:offload
  11. response-route-run = last:
  12.  
  13. response-route-label = offload
  14. response-route-run = addvar:FOO=BAR
  15. response-route-run = addvar:TEST1=TEST2
  16. response-route-run = uwsgi:/tmp/foo,0,0

使用uwsgi api (>= uWSGI 2.0.3) 进行简化

虽然处理头部是相当HTTP友好型的,但uWSGI 2.0.3增加了在代码中直接定义每个请求变量的可能性。

这允许一个更“优雅”的方式 (即使高度不可移植):

  1. import uwsgi
  2.  
  3. def subscribe(request):
  4. uwsgi.add_var("LOGGED_IN_USER", request.user)
  5. uwsgi.add_var("USER_IS_UGLY", "probably")
  6. uwsgi.add_var("OFFLOAD_TO_SSE", "y")
  7. uwsgi.add_var("OFFLOAD_SERVER", "/tmp/foo")
  8. return HttpResponse()

现在,配置可以修改成更优雅:

  1. ; the boring part
  2. http-socket = :9090
  3. offload-threads = 2
  4. wsgi-file = sseproject/wsgi.py
  5.  
  6. ; if OFFLOAD_TO_SSE is 'y', do not send the headers generated by Django
  7. response-route-if = equal:${OFFLOAD_TO_SSE};y disableheaders:
  8. ; if OFFLOAD_TO_SSE is 'y', offload the request to the app running on 'OFFLOAD_SERVER'
  9. response-route-if = equal:${OFFLOAD_TO_SSE};y uwsgi:${OFFLOAD_SERVER},0,0

你注意到我们如何允许Django应用设置后端服务器来使用请求变量了吗?

现在,我们可以更进一步。我们不会使用路由框架 (除了禁用头部生成):

  1. import uwsgi
  2.  
  3. def subscribe(request):
  4. uwsgi.add_var("LOGGED_IN_USER", request.user)
  5. uwsgi.add_var("USER_IS_UGLY", "probably")
  6. uwsgi.route("uwsgi", "/tmp/foo,0,0")
  7. return HttpResponse()

以及一个简单的:

  1. ; the boring part
  2. http-socket = :9090
  3. offload-threads = 2
  4. wsgi-file = sseproject/wsgi.py
  5.  
  6. response-route = ^/subscribe disableheaders:

Websockets怎样?

我们已经看到了如何卸载SSE (那是单向的)。我们也可以卸载websockets (那是双向的)。

概念是相同的,你只需要保证 (和之前一样) Django没有发送任何头部,(否则,websocket握手将会失败),然后,你可以修改你的gevent应用:

  1. import time
  2. import uwsgi
  3.  
  4. def application(e, start_response):
  5. print e
  6. uwsgi.websocket_handshake()
  7. # enter the loop
  8. while True:
  9. # monkey patching will prevent sleep() to block
  10. time.sleep(1)
  11. # send to the client
  12. uwsgi.websocket_send(str(time.time()))

使用redis或者uWSGI缓存框架

请求变量是方便的 (并且有趣),但是它们有限 (见下)。如果你需要在Django和sse/websocket应用之间传递大量的数据,那么Redis是个不错的方式 (并且和gevent完美契合)。基本上,你存储来自Django的信息到redis中,然后只传递哈希键 (通过请求变量) 到sse/websocket应用。

可以用uWSGI缓存框架完成相同的工作,但是考虑到redis拥有大量的数据原语,而uWSGI只支持key->value项。

常见陷阱

  • 你可以添加到每个请求上的变量总数是由uwsgi包缓存(默认是4k)限制的。你可以用–buffer-size选项将其增至64k。
  • 这是这篇文章的重点:不要在你的gevent应用中使用Django ORM,除非你知道你在干什么!!!(读一读:你有一个Django数据库适配器,它支持gevent,并且与标准的相比,它并不糟糕……)
  • 忘记找到一种方式来禁用Django中的头部生成吧。这是它的WSGI适配器的一个“限制/特性”,使用uWSGI设施 (如果可用的话),或者不要在你的gevent应用中生成头部。最终,你可以以这种方式修改wsgi.py:
  1. """
  2. WSGI config for sseproject project.
  3.  
  4. It exposes the WSGI callable as a module-level variable named ``application``.
  5.  
  6. For more information on this file, see
  7. https://docs.djangoproject.com/en/1.6/howto/deployment/wsgi/
  8. """
  9.  
  10. import os
  11. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sseproject.settings")
  12.  
  13. from django.core.wsgi import get_wsgi_application
  14. django_application = get_wsgi_application()
  15.  
  16. def fake_start_response(status, headers, exc_info=None):
  17. pass
  18.  
  19. def application(environ, start_response):
  20. if environ['PATH_INFO'] == '/subscribe':
  21. return django_application(environ, fake_start_response)
  22. return django_application(environ, start_response)