12.12 使用生成器代替线程

问题

你想使用生成器(协程)替代系统线程来实现并发。这个有时又被称为用户级线程或绿色线程。

解决方案

要使用生成器实现自己的并发,你首先要对生成器函数和 yield 语句有深刻理解。yield 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器,将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。要演示这种思想,考虑下面两个使用简单的 yield 语句的生成器函数:

  1. # Two simple generator functions
  2. def countdown(n):
  3. while n > 0:
  4. print('T-minus', n)
  5. yield
  6. n -= 1
  7. print('Blastoff!')
  8.  
  9. def countup(n):
  10. x = 0
  11. while x < n:
  12. print('Counting up', x)
  13. yield
  14. x += 1

这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:

  1. from collections import deque
  2.  
  3. class TaskScheduler:
  4. def __init__(self):
  5. self._task_queue = deque()
  6.  
  7. def new_task(self, task):
  8. '''
  9. Admit a newly started task to the scheduler
  10.  
  11. '''
  12. self._task_queue.append(task)
  13.  
  14. def run(self):
  15. '''
  16. Run until there are no more tasks
  17. '''
  18. while self._task_queue:
  19. task = self._task_queue.popleft()
  20. try:
  21. # Run until the next yield statement
  22. next(task)
  23. self._task_queue.append(task)
  24. except StopIteration:
  25. # Generator is no longer executing
  26. pass
  27.  
  28. # Example use
  29. sched = TaskScheduler()
  30. sched.new_task(countdown(10))
  31. sched.new_task(countdown(5))
  32. sched.new_task(countup(15))
  33. sched.run()

TaskScheduler 类在一个循环中运行生成器集合——每个都运行到碰到yield语句为止。运行这个例子,输出如下:

  1. T-minus 10
  2. T-minus 5
  3. Counting up 0
  4. T-minus 9
  5. T-minus 4
  6. Counting up 1
  7. T-minus 8
  8. T-minus 3
  9. Counting up 2
  10. T-minus 7
  11. T-minus 2
  12. ...

到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。生成器函数就是认为,而yield语句是任务挂起的信号。调度器循环检查任务列表直到没有任务要执行为止。

实际上,你可能想要使用生成器来实现简单的并发。那么,在实现actor或网络服务器的时候你可以使用生成器来替代线程的使用。

下面的代码演示了使用生成器来实现一个不依赖线程的actor:

  1. from collections import deque
  2.  
  3. class ActorScheduler:
  4. def __init__(self):
  5. self._actors = { } # Mapping of names to actors
  6. self._msg_queue = deque() # Message queue
  7.  
  8. def new_actor(self, name, actor):
  9. '''
  10. Admit a newly started actor to the scheduler and give it a name
  11. '''
  12. self._msg_queue.append((actor,None))
  13. self._actors[name] = actor
  14.  
  15. def send(self, name, msg):
  16. '''
  17. Send a message to a named actor
  18. '''
  19. actor = self._actors.get(name)
  20. if actor:
  21. self._msg_queue.append((actor,msg))
  22.  
  23. def run(self):
  24. '''
  25. Run as long as there are pending messages.
  26. '''
  27. while self._msg_queue:
  28. actor, msg = self._msg_queue.popleft()
  29. try:
  30. actor.send(msg)
  31. except StopIteration:
  32. pass
  33.  
  34. # Example use
  35. if __name__ == '__main__':
  36. def printer():
  37. while True:
  38. msg = yield
  39. print('Got:', msg)
  40.  
  41. def counter(sched):
  42. while True:
  43. # Receive the current count
  44. n = yield
  45. if n == 0:
  46. break
  47. # Send to the printer task
  48. sched.send('printer', n)
  49. # Send the next count to the counter task (recursive)
  50.  
  51. sched.send('counter', n-1)
  52.  
  53. sched = ActorScheduler()
  54. # Create the initial actors
  55. sched.new_actor('printer', printer())
  56. sched.new_actor('counter', counter(sched))
  57.  
  58. # Send an initial message to the counter to initiate
  59. sched.send('counter', 10000)
  60. sched.run()

完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。本质上,调度器在有需要发送的消息时会一直运行着。计数生成器会给自己发送消息并在一个递归循环中结束。

下面是一个更加高级的例子,演示了使用生成器来实现一个并发网络应用程序:

  1. from collections import deque
  2. from select import select
  3.  
  4. # This class represents a generic yield event in the scheduler
  5. class YieldEvent:
  6. def handle_yield(self, sched, task):
  7. pass
  8. def handle_resume(self, sched, task):
  9. pass
  10.  
  11. # Task Scheduler
  12. class Scheduler:
  13. def __init__(self):
  14. self._numtasks = 0 # Total num of tasks
  15. self._ready = deque() # Tasks ready to run
  16. self._read_waiting = {} # Tasks waiting to read
  17. self._write_waiting = {} # Tasks waiting to write
  18.  
  19. # Poll for I/O events and restart waiting tasks
  20. def _iopoll(self):
  21. rset,wset,eset = select(self._read_waiting,
  22. self._write_waiting,[])
  23. for r in rset:
  24. evt, task = self._read_waiting.pop(r)
  25. evt.handle_resume(self, task)
  26. for w in wset:
  27. evt, task = self._write_waiting.pop(w)
  28. evt.handle_resume(self, task)
  29.  
  30. def new(self,task):
  31. '''
  32. Add a newly started task to the scheduler
  33. '''
  34.  
  35. self._ready.append((task, None))
  36. self._numtasks += 1
  37.  
  38. def add_ready(self, task, msg=None):
  39. '''
  40. Append an already started task to the ready queue.
  41. msg is what to send into the task when it resumes.
  42. '''
  43. self._ready.append((task, msg))
  44.  
  45. # Add a task to the reading set
  46. def _read_wait(self, fileno, evt, task):
  47. self._read_waiting[fileno] = (evt, task)
  48.  
  49. # Add a task to the write set
  50. def _write_wait(self, fileno, evt, task):
  51. self._write_waiting[fileno] = (evt, task)
  52.  
  53. def run(self):
  54. '''
  55. Run the task scheduler until there are no tasks
  56. '''
  57. while self._numtasks:
  58. if not self._ready:
  59. self._iopoll()
  60. task, msg = self._ready.popleft()
  61. try:
  62. # Run the coroutine to the next yield
  63. r = task.send(msg)
  64. if isinstance(r, YieldEvent):
  65. r.handle_yield(self, task)
  66. else:
  67. raise RuntimeError('unrecognized yield event')
  68. except StopIteration:
  69. self._numtasks -= 1
  70.  
  71. # Example implementation of coroutine-based socket I/O
  72. class ReadSocket(YieldEvent):
  73. def __init__(self, sock, nbytes):
  74. self.sock = sock
  75. self.nbytes = nbytes
  76. def handle_yield(self, sched, task):
  77. sched._read_wait(self.sock.fileno(), self, task)
  78. def handle_resume(self, sched, task):
  79. data = self.sock.recv(self.nbytes)
  80. sched.add_ready(task, data)
  81.  
  82. class WriteSocket(YieldEvent):
  83. def __init__(self, sock, data):
  84. self.sock = sock
  85. self.data = data
  86. def handle_yield(self, sched, task):
  87.  
  88. sched._write_wait(self.sock.fileno(), self, task)
  89. def handle_resume(self, sched, task):
  90. nsent = self.sock.send(self.data)
  91. sched.add_ready(task, nsent)
  92.  
  93. class AcceptSocket(YieldEvent):
  94. def __init__(self, sock):
  95. self.sock = sock
  96. def handle_yield(self, sched, task):
  97. sched._read_wait(self.sock.fileno(), self, task)
  98. def handle_resume(self, sched, task):
  99. r = self.sock.accept()
  100. sched.add_ready(task, r)
  101.  
  102. # Wrapper around a socket object for use with yield
  103. class Socket(object):
  104. def __init__(self, sock):
  105. self._sock = sock
  106. def recv(self, maxbytes):
  107. return ReadSocket(self._sock, maxbytes)
  108. def send(self, data):
  109. return WriteSocket(self._sock, data)
  110. def accept(self):
  111. return AcceptSocket(self._sock)
  112. def __getattr__(self, name):
  113. return getattr(self._sock, name)
  114.  
  115. if __name__ == '__main__':
  116. from socket import socket, AF_INET, SOCK_STREAM
  117. import time
  118.  
  119. # Example of a function involving generators. This should
  120. # be called using line = yield from readline(sock)
  121. def readline(sock):
  122. chars = []
  123. while True:
  124. c = yield sock.recv(1)
  125. if not c:
  126. break
  127. chars.append(c)
  128. if c == b'\n':
  129. break
  130. return b''.join(chars)
  131.  
  132. # Echo server using generators
  133. class EchoServer:
  134. def __init__(self,addr,sched):
  135. self.sched = sched
  136. sched.new(self.server_loop(addr))
  137.  
  138. def server_loop(self,addr):
  139. s = Socket(socket(AF_INET,SOCK_STREAM))
  140.  
  141. s.bind(addr)
  142. s.listen(5)
  143. while True:
  144. c,a = yield s.accept()
  145. print('Got connection from ', a)
  146. self.sched.new(self.client_handler(Socket(c)))
  147.  
  148. def client_handler(self,client):
  149. while True:
  150. line = yield from readline(client)
  151. if not line:
  152. break
  153. line = b'GOT:' + line
  154. while line:
  155. nsent = yield client.send(line)
  156. line = line[nsent:]
  157. client.close()
  158. print('Client closed')
  159.  
  160. sched = Scheduler()
  161. EchoServer(('',16000),sched)
  162. sched.run()

这段代码有点复杂。不过,它实现了一个小型的操作系统。有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。还有很多调度器负责在就绪队列和I/O等待区域之间移动任务。

讨论

在构建基于生成器的并发框架时,通常会使用更常见的yield形式:

  1. def some_generator():
  2. ...
  3. result = yield data
  4. ...

使用这种形式的yield语句的函数通常被称为“协程”。通过调度器,yield语句在一个循环中被处理,如下:

  1. f = some_generator()
  2.  
  3. # Initial result. Is None to start since nothing has been computed
  4. result = None
  5. while True:
  6. try:
  7. data = f.send(result)
  8. result = ... do some calculation ...
  9. except StopIteration:
  10. break

这里的逻辑稍微有点复杂。不过,被传给 send() 的值定义了在yield语句醒来时的返回值。因此,如果一个yield准备在对之前yield数据的回应中返回结果时,会在下一次 send() 操作返回。如果一个生成器函数刚开始运行,发送一个None值会让它排在第一个yield语句前面。

除了发送值外,还可以在一个生成器上面执行一个 close() 方法。它会导致在执行yield语句时抛出一个 GeneratorExit 异常,从而终止执行。如果进一步设计,一个生成器可以捕获这个异常并执行清理操作。同样还可以使用生成器的 throw() 方法在yield语句执行时生成一个任意的执行指令。一个任务调度器可利用它来在运行的生成器中处理错误。

最后一个例子中使用的 yield from 语句被用来实现协程,可以被其它生成器作为子程序或过程来调用。本质上就是将控制权透明的传输给新的函数。不像普通的生成器,一个使用 yield from 被调用的函数可以返回一个作为 yield from 语句结果的值。关于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器编程,要提醒你的是它还是有很多缺点的。特别是,你得不到任何线程可以提供的好处。例如,如果你执行CPU依赖或I/O阻塞程序,它会将整个任务挂起知道操作完成。为了解决这个问题,你只能选择将操作委派给另外一个可以独立运行的线程或进程。另外一个限制是大部分Python库并不能很好的兼容基于生成器的线程。如果你选择这个方案,你会发现你需要自己改写很多标准库函数。作为本节提到的协程和相关技术的一个基础背景,可以查看 PEP 342“协程和并发的一门有趣课程”

PEP 3156 同样有一个关于使用协程的异步I/O模型。特别的,你不可能自己去实现一个底层的协程调度器。不过,关于协程的思想是很多流行库的基础,包括 gevent,greenlet,Stackless Python 以及其他类似工程。

原文:

http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p12_using_generators_as_alternative_to_threads.html