Offloading Websockets and Server-Sent Events AKA “Combine them with Django safely”
作者:Roberto De Ioris
日期:20140315
免责声明
这篇文章显示了将websockets(或者sse)与Django以一种“安全的方式”结合起来的一种相当高级的方法。它不会向你展示websockets和sse有多酷,或者如何用它们写出更好的应用,而是试图让你避免使用它们的最差实践。
在我看来,Python面向web的世界正面临着一种通信/市场问题:大量的人在非阻塞技术(例如gevent)上运行大量阻塞应用(例如django)只是因为有人告诉他们这很酷并且会解决他们所有的扩展问题。
这是完全错误,危险和邪恶的,你不能将阻塞应用和非阻塞引擎混在一起,即使是一个单一、非常小的阻塞部分都能潜在摧毁你整个栈。正如我已经说了几十次一样,如果你的应用是99.9999999%非阻塞的,那么它仍然是阻塞的。
并且不是,你的Django应用上的猴子补丁并非魔法。除非你正使用高度自定义的数据库适配器,对工作在非阻塞模式进行了调整,否则你就是错的。
以看起来就是个超级大混蛋的代价,我强烈建议你完全忽略人们让你将你的Django应用移到gevent, eventlet, tornado或者其他什么的,而不警告你你会遇到数百个问题的建议。
话虽如此,我爱gevent,它可能是uWSGI项目中支持得最好的(带perl的Coro::AnyEvent)循环引擎了。因此,在这篇文章中,我将使用gevent来为Django部分管理websocket/sse流量和纯多进程。
如果这最后一句对你来说像是废话,那么你可能不知道uWSGI的卸载是什么……
uWSGI卸载
这个概念并非是个新东西,或者是uWSGI特有的。诸如nodejs或者twisted这样的项目已经用它多年了。
注解
一个提供静态文件服务的web应用的例子并不非常有趣,也不是用来展示的最好的东西,但是在稍后展示一个使用X-Sendfile的真实世界的例子的时候,会游泳。
想象这个简单的WSGI应用:
- def application(env, start_response):
- start_response('200 OK',[('Content-Type','text/plain')])
- f = open('/etc/services')
- # do not do it, if the file is 4GB it will allocate 4GB of memory !!!
- yield f.read()
这将会简单返回/etc/services的内容。它是一个相当小的文件,因此在几毫秒时间内,你的进程将会准备好处理另一个请求。
那要是/etc/services是4千兆字节呢?你的进程(或者线程)将会阻塞几秒(甚至几分钟),并且不能够管理其他请求,直到完全传输这个文件。
如果你可以告诉其他线程为你发送这个文件,这样你就能够管理其他请求,岂不是很酷?
卸载就是这样的:它会为你提供一个或多个线程来为你完成一些简单并且慢点任务。哪种任务呢?所有那些可以以一种非阻塞方式管理的任务,因此单个线程就可以为你管理上千个传输。
你可以将其视为你的电脑中的DMA引擎,你的CPU将会编程DMA来将内存从控制器传输到RAM,然后将会被释放,以完成其他任务,同时DMA在后台工作。
要在uWSGI中启用卸载,你只需要添加 —offload-threads <n>
选项,其中<n>是每个进程要生成的线程数。 (一般而言,单个线程就够了,但是如果你想要使用/滥用你的多CPU核,那么随意增加)
一旦启用了卸载,那么每当uWSGI检测到一个操作能够安全被卸载的时候,它将自动使用它。
在python/WSGI中,任何对wsgi.file_wrapper的使用将会被自动卸载,以及当你使用uWSGI代理特性来传递请求到其他使用uwsgi或者HTTP协议到服务器时。
一个很酷的例子 (甚至在uWSGI文档的Snippets页面也有展示) 是实现一个卸载助力的X-Sendfile特性:
- [uwsgi]
- ; load router_static plugin (compiled in by default in monolithic profiles)
- plugins = router_static
- ; spawn 2 offload threads
- offload-threads = 2
- ; files under /etc can be safely served (DANGEROUS !!!)
- static-safe = /etc
- ; collect the X-Sendfile response header as X_SENDFILE var
- collect-header = X-Sendfile X_SENDFILE
- ; if X_SENDFILE is not empty, pass its value to the "static" routing action (it will automatically use offloading if available)
- response-route-if-not = empty:${X_SENDFILE} static:${X_SENDFILE}
- ; now the classic options
- plugins = python
- ; bind to HTTP port 8080
- http-socket = :8080
- ; load a simple wsgi-app
- wsgi-file = myapp.py
现在,在我们的应用中,我们可以X-Sendfile来在无阻塞情况下发送静态文件:
- def application(env, start_response):
- start_response('200 OK',[('X-Sendfile','/etc/services')])
- return []
在这篇文章中将会使用一个非常类似的概念:我们将会使用一个正常的Django来设置我们的会话,来认证用户,以及任意(快的)东东,然后我们会返回一个特别的头,它会指示uWSGI卸载连接到另一个uWSGI实例 (监听一个私有socket),这个实例将使用gevent,以一种非阻塞方式管理websocket/sse事务。
我们的SSE应用
SSE部分将非常简单,一个基于gevent的WSGI应用将会每秒发送当前时间:
- from sse import Sse
- import time
- def application(e, start_response):
- print e
- # create the SSE session
- session = Sse()
- # prepare HTTP headers
- headers = []
- headers.append(('Content-Type','text/event-stream'))
- headers.append(('Cache-Control','no-cache'))
- start_response('200 OK', headers)
- # enter the loop
- while True:
- # monkey patching will prevent sleep() blocking
- time.sleep(1)
- # add the message
- session.add_message('message', str(time.time()))
- # send to the client
- yield str(session)
让我们在/tmp/foo UNIX socket上运行它 (将应用保存为sseapp.py)
- uwsgi --wsgi-file sseapp.py --socket /tmp/foo --gevent 1000 --gevent-monkey-patch
(time.sleep()需要猴子补丁,如果你想要/喜欢的话,随意使用gevent原语来休眠)
(无趣的)HTML/Javascript
- <html>
- <head>
- </head>
- <body>
- <h1>Server sent events</h1>
- <div id="event"></div>
- <script type="text/javascript">
- var eventOutputContainer = document.getElementById("event");
- var evtSrc = new EventSource("/subscribe");
- evtSrc.onmessage = function(e) {
- console.log(e.data);
- eventOutputContainer.innerHTML = e.data;
- };
- </script>
- </body>
- </html>
它非常简单,它将连接到/subscribe,并且将开始等待事件。
Django视图
我们的django视图,将非常简单,它将简单生成一个特色的响应头 (我们讲称之为X-Offload-to-SSE),并且把登录用户的用户名作为它的值:
- def subscribe(request):
- response = HttpResponse()
- response['X-Offload-to-SSE'] = request.user
- return response
现在,我们已经为“高级”部分准备好了。
让我们卸载SSE事务
配置看起来会有点复杂,但是它与之前看到的X-Sendfile概念相同:
- [uwsgi]
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- ; collect X-Offload-to-SSE header and store in var X_OFFLOAD
- collect-header = X-Offload-to-SSE X_OFFLOAD
- ; if X_OFFLOAD is defined, do not send the headers generated by Django
- response-route-if-not = empty:${X_OFFLOAD} disableheaders:
- ; if X_OFFLOAD is defined, offload the request to the app running on /tmp/foo
- response-route-if-not = empty:${X_OFFLOAD} uwsgi:/tmp/foo,0,0
唯一“新的”部分是使用 disableheaders
路由动作。这是必须的,否则Django生成的头将会伴着由基于gevent的应用生成的头发送。
你可以避免它 (记住,只在2.0.3添加了 disableheaders
),在gevent应用中移除到start_response()到调用 (冒着被一些WSGI神诅咒的风险),然后修改Django视图来设置正确的头部:
- def subscribe(request):
- response = HttpResponse()
- response['Content-Type'] = 'text/event-stream'
- response['X-Offload-to-SSE'] = request.user
- return response
最终,你或许想要更加“精简”,并简单检测’text/event-stream’ content_type存在:
- [uwsgi]
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- ; collect Content-Type header and store in var CONTENT_TYPE
- collect-header = Content-Type CONTENT_TYPE
- ; if CONTENT_TYPE is 'text/event-stream', forward the request
- response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0
现在,如何在gevent应用中访问Django登录用户的用户名呢?
你应该注意到,gevent应用在每个请求中打印了WSGI环境变量的内容。那个环境变量与Django应用+已收集头部相同。因此,访问environ[‘X_OFFLOAD’]将会返回已登录用户名。 (显然,在第二个例子中,使用了内容类型,不再收集带用户名的变量,因此你应该修复它)
你可以使用相同的方法传递所有你需要的信息,你可以收集所有你需要的变量,等等等等。
你甚至可以在运行时添加变量:
- [uwsgi]
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- ; collect Content-Type header and store in var CONTENT_TYPE
- collect-header = Content-Type CONTENT_TYPE
- response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:FOO=BAR
- response-route-if = equal:${CONTENT_TYPE};text/event-stream addvar:TEST1=TEST2
- ; if CONTENT_TYPE is 'text/event-stream', forward the request
- response-route-if = equal:${CONTENT_TYPE};text/event-stream uwsgi:/tmp/foo,0,0
或者 (使用goto以获得更好的可读性):
- [uwsgi]
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- ; collect Content-Type header and store in var CONTENT_TYPE
- collect-header = Content-Type CONTENT_TYPE
- response-route-if = equal:${CONTENT_TYPE};text/event-stream goto:offload
- response-route-run = last:
- response-route-label = offload
- response-route-run = addvar:FOO=BAR
- response-route-run = addvar:TEST1=TEST2
- response-route-run = uwsgi:/tmp/foo,0,0
使用uwsgi api (>= uWSGI 2.0.3) 进行简化
虽然处理头部是相当HTTP友好型的,但uWSGI 2.0.3增加了在代码中直接定义每个请求变量的可能性。
这允许一个更“优雅”的方式 (即使高度不可移植):
- import uwsgi
- def subscribe(request):
- uwsgi.add_var("LOGGED_IN_USER", request.user)
- uwsgi.add_var("USER_IS_UGLY", "probably")
- uwsgi.add_var("OFFLOAD_TO_SSE", "y")
- uwsgi.add_var("OFFLOAD_SERVER", "/tmp/foo")
- return HttpResponse()
现在,配置可以修改成更优雅:
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- ; if OFFLOAD_TO_SSE is 'y', do not send the headers generated by Django
- response-route-if = equal:${OFFLOAD_TO_SSE};y disableheaders:
- ; if OFFLOAD_TO_SSE is 'y', offload the request to the app running on 'OFFLOAD_SERVER'
- response-route-if = equal:${OFFLOAD_TO_SSE};y uwsgi:${OFFLOAD_SERVER},0,0
你注意到我们如何允许Django应用设置后端服务器来使用请求变量了吗?
现在,我们可以更进一步。我们不会使用路由框架 (除了禁用头部生成):
- import uwsgi
- def subscribe(request):
- uwsgi.add_var("LOGGED_IN_USER", request.user)
- uwsgi.add_var("USER_IS_UGLY", "probably")
- uwsgi.route("uwsgi", "/tmp/foo,0,0")
- return HttpResponse()
以及一个简单的:
- ; the boring part
- http-socket = :9090
- offload-threads = 2
- wsgi-file = sseproject/wsgi.py
- response-route = ^/subscribe disableheaders:
Websockets怎样?
我们已经看到了如何卸载SSE (那是单向的)。我们也可以卸载websockets (那是双向的)。
概念是相同的,你只需要保证 (和之前一样) Django没有发送任何头部,(否则,websocket握手将会失败),然后,你可以修改你的gevent应用:
- import time
- import uwsgi
- def application(e, start_response):
- print e
- uwsgi.websocket_handshake()
- # enter the loop
- while True:
- # monkey patching will prevent sleep() to block
- time.sleep(1)
- # send to the client
- uwsgi.websocket_send(str(time.time()))
使用redis或者uWSGI缓存框架
请求变量是方便的 (并且有趣),但是它们有限 (见下)。如果你需要在Django和sse/websocket应用之间传递大量的数据,那么Redis是个不错的方式 (并且和gevent完美契合)。基本上,你存储来自Django的信息到redis中,然后只传递哈希键 (通过请求变量) 到sse/websocket应用。
可以用uWSGI缓存框架完成相同的工作,但是考虑到redis拥有大量的数据原语,而uWSGI只支持key->value项。
常见陷阱
- 你可以添加到每个请求上的变量总数是由uwsgi包缓存(默认是4k)限制的。你可以用–buffer-size选项将其增至64k。
- 这是这篇文章的重点:不要在你的gevent应用中使用Django ORM,除非你知道你在干什么!!!(读一读:你有一个Django数据库适配器,它支持gevent,并且与标准的相比,它并不糟糕……)
- 忘记找到一种方式来禁用Django中的头部生成吧。这是它的WSGI适配器的一个“限制/特性”,使用uWSGI设施 (如果可用的话),或者不要在你的gevent应用中生成头部。最终,你可以以这种方式修改wsgi.py:
- """
- WSGI config for sseproject project.
- It exposes the WSGI callable as a module-level variable named ``application``.
- For more information on this file, see
- https://docs.djangoproject.com/en/1.6/howto/deployment/wsgi/
- """
- import os
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sseproject.settings")
- from django.core.wsgi import get_wsgi_application
- django_application = get_wsgi_application()
- def fake_start_response(status, headers, exc_info=None):
- pass
- def application(environ, start_response):
- if environ['PATH_INFO'] == '/subscribe':
- return django_application(environ, fake_start_response)
- return django_application(environ, start_response)