协同程序

  协同程序与线程(thread)差不多,也就是一条执行序列,拥有自己独立的栈、局部变量和指令指针,同时又与其他协同程序共享全局变量和其他大部分东西。从概念上讲线程与协同程序的主要区别在于,一个具有多个线程的程序可以同时运行几个线程,而协同程序却需要彼此协作地运行。就是说,一个具有多个协同程序的程序在任意时刻只能运行一个协同程序,并且正在运行的协同程序只会在其显式地要求挂起(suspend)时,它的执行才会暂停。

  

  1. 协同程序基础
  2. 管道与过滤器
  3. 以协同程序实现迭代器
  4. 非抢先式的多线程

  

  ● 协同程序基础

  Lua将所有关于协同程序的函数放置在一个名为“coroutine”的table中。

  函数create用于创建新的协同程序,它只有一个参数,就是一个函数。该函数的代码就是协同程序所需执行的内容。create会返回一个thread类型的值,用以表示新的协同程序。通常create的参数是一个匿名函数。

  1. co = coroutine.create( function () print( "hi" ) end )
  2. print( co ) -- thread: 0x7fe2f1506218

  一个协同程序可以处于4种不同的状态:挂起(suspended)、运行(running)、死亡(dead)和正常(normal)。当创建一个协同程序时,它处于挂起状态。也就是说,协同程序不会在创建它时自动执行其内容。可以通过函数status来检查协同程序的状态:

  1. print( coroutine.status( co ) ) -- suspended

  函数coroutine.resume用于启动或再次启动一个协同程序的执行,并将其状态由挂起改为运行:

  1. coroutine.resume( co ) -- hi

  在本例中,协同程序的内容只是简单地打印了“hi”后便终止了,然后它就处于死亡状态,也就再也无法返回了:

  1. print( coroutine.status( co ) ) -- dead

  到目前为止,协同程序看上去还只是像一种复杂的函数调用方法。其实协同程序的真正强大之处在于函数yield的使用上,该函数可以让一个运行中的协同程序挂起,而之后可以再恢复它的运行。

  1. co = coroutine.create( function ()
  2. for i = 1, 10 do
  3. print( "co", i )
  4. coroutine.yield()
  5. end
  6. end )

  现在当唤醒这个协同程序时,它就会开始执行,直到第一个yield

  1. coroutine.resume( co ) -- co 1

  如果此时检查其状态,会发现协同程序处于挂起状态,因此可以再次恢复其运行:

  1. print( coroutine.status( co ) ) -- suspended

  从协同程序的角度看,所有在它挂起时发生的活动都发生在yield调用中。当恢复协同程序的执行时,对于yield的调用才最终返回。然后协同程序继续它的执行,直到下一个yield调用或执行结束:

  1. coroutine.resume( co ) -- co 2
  2. coroutine.resume( co ) -- co 3
  3. ...
  4. coroutine.resume( co ) -- co 10
  5. coroutine.resume( co ) -- 什么都不打印

  在最后一次调用resume时,协同程序的内容已经执行完毕,并已经返回。因此,这时协同程序处于死亡状态。如果试图再次恢复它的执行,resume将返回false及一条错误消息:

  1. print(coroutine.resume( co )) -- false cannot resume dead coroutine
  请注意,resume是在保护模式中运行的。因此,如果在一个协同程序的执行中发生任何错误,Lua是不会显示错误消息的,而是将执行权返回给resume调用。

  当一个协同程序A唤醒一个协同程序B时,协同程序A就处于一个特殊状态,既不是挂起状态(无法继续A的执行),也不是运行状态(是B在运行)。所以将这时的状态称为“正常”状态。

  Lua的协同程序还具有一项有用的机制,就是可以通过一对resume-yield来交换数据。在第一次调用resume时,并没有对应的yield在等待它,因此所有传递给resume的额外参数都将视为协同程序主函数的参数:

  1. co = coroutine.create( function ( a, b, c )
  2. print( "co", a, b, c )
  3. end )
  4. coroutine.resume( co, 1, 2, 3 ) -- co 1 2 3

  在resume调用返回的内容中,第一个值为true则表示没有错误,而后面所有的值都是对应yield传入的参数:

  1. co = coroutine.create( function ( a, b )
  2. coroutine.yield( a + b, a - b )
  3. end )
  4. print( coroutine.resume( co, 20, 10 ) ) -- true 30 10

  与此对应的是,yield返回的额外值就是对应resume传入的参数:

  1. co = coroutine.create( function ()
  2. print( "co", coroutine.yield() )
  3. end )
  4. print(coroutine.resume( co, "a" )) -- true
  5. print(coroutine.resume( co, 4, 5, 6 )) -- co 4 5 6 -- true
  6. print(coroutine.resume( co, 4, 5)) -- false cannot resume dead coroutine

  最后,当一个协同程序结束时,它的主函数所返回的值都将作为对应resume的返回值:

  1. co = coroutine.create( function ()
  2. return 6, 7
  3. end )
  4. print( coroutine.resume( co ) ) -- true 6 7

  

  ● 管道与过滤器

  1. ----------------------------------------- 管道与过滤器
  2. function receive( prod )
  3. local status, value = coroutine.resume( prod )
  4. return value
  5. end
  6. function send( x )
  7. coroutine.yield( x )
  8. end
  9. function producer( )
  10. return coroutine.create( function ( )
  11. while true do
  12. local x = io.read() -- 产生新值
  13. send(x)
  14. end
  15. end )
  16. end
  17. function filter( prod )
  18. return coroutine.create( function ( )
  19. for line = 1, math.huge do
  20. local x = recevie(prod) -- 获取新值
  21. x = string.format( "%5d %s", line, x )
  22. send(x) -- 将新值发送给消费者
  23. end
  24. end )
  25. end
  26. function consumer( prod )
  27. while true do
  28. local x = receive(prod) -- 获取新值
  29. io.write( x, "\n" ) -- 消费新值
  30. end
  31. end
  32. -- 运行代码
  33. consumer(filter(producer()))

  

  ● 以协同程序实现迭代器

  将循环迭代器视为“生产者-消费者”模式的一种特例,一个迭代器会产出一些内容,而循环体会消费这些内容。因此,这样看来协同程序似乎也适用于实现迭代器。的确,协同程序为实现这类任务提供了一项有用的工具。那就是先前提到的,协同程序可以一改传统的调用者与被调用者之间的关系。有了这个特性,在编写迭代器时,就无须顾及如何在每次成功的迭代调用之间保存状态信息了。

  为了说明这类应用,下面来写一个迭代器,使其可以遍历某个数组的所有排列组合形式。若直接编写这种迭代器可能不太容易,但若编写一个递归函数来产生所有的排列组合则不会很困难。想法很简单,只要将每个数组元素都依次放在最后一个位置,然后递归地生成其余元素的排列。代码如下:

  1. function permgen(a, n)
  2. n = n or #a -- 默认n为a的大小
  3. if n <= 1 then
  4. printResult(a)
  5. else
  6. for i = 1, n do
  7. -- 将第i个元素放到数组末尾
  8. a[n], a[i] = a[i], a[n]
  9. -- 生成其余元素的排列
  10. permgen(a, n - 1)
  11. -- 恢复第i个元素
  12. a[n], a[i] = a[i], a[n]
  13. end
  14. end
  15. end

  然后,还需要定义其中调用到的打印函数printResult,并以适当的参数来调用permgen

  1. function printResult(a)
  2. for i = 1, #a do
  3. io.write(a[i], " ")
  4. end
  5. io.write("\n")
  6. end
  7. permgen({1, 2, 3, 4})
  8. --> 2 3 4 1
  9. --> 3 2 4 1
  10. --> 3 4 2 1
  11. ...
  12. --> 2 1 3 4
  13. --> 1 2 3 4

  当生成函数完成后,将其转换为一个迭代器就非常容易了。首先,将printResult改为yield

  1. function permgen(a, n)
  2. n = n or #a
  3. if n <= 1 then
  4. coroutine.yield(a)
  5. else
  6. for i = 1, n do
  7. -- 将第i个元素放到数组末尾
  8. a[n], a[i] = a[i], a[n]
  9. -- 生成其余元素的排列
  10. permgen(a, n - 1)
  11. -- 恢复第i个元素
  12. a[n], a[i] = a[i], a[n]
  13. end
  14. end
  15. end

  然后,定义一个工厂函数,用于将生成函数放到一个协同程序中运行,并创建迭代器函数。迭代器只是简单地唤醒协同程序,让其产生下一种排列:

  1. function permutations(a)
  2. local co = coroutine.create(function() permgen(a) end)
  3. return function() -- 迭代器
  4. local code, res = coroutine.resume(co)
  5. return res
  6. end
  7. end

  有了上面的函数,在for语句中遍历一个数组的所有排列就非常简单了:

  1. for p in permutations{"a", "b", "c"} do
  2. printResult(p)
  3. end
  4. --> b c a
  5. --> c a b
  6. --> a c b
  7. --> b a c
  8. --> a b c

  permutations函数使用了一种在Lua中比较常见的模式,就是将一条唤醒协同程序的调用包装在一个函数中。由于这种模式比较常见,所以Lua专门提供了一个函数coroutine.wrap来完成这个功能。类似于createwrap创建了一个新的协同程序。但不同的是,wrap并不是返回协同程序本身,而是返回一个函数。每当调用这个函数,即可唤醒一次协同程序。但这个函数与resume的不同之处在于,它不会返回错误代码。当遇到错误时,它会引发错误。若使用wrap,可以这么写permutations

  1. function permutations(a)
  2. return coroutine.wrap(function() permgen(a) end)
  3. end

  通常,coroutine.wrapcoroutine.create更易于使用。它提供了一个对于协同程序编程实际所需的功能,即一个可以唤醒协同程序的函数。但也缺乏灵活性。无法检查wrap所创建的协同程序的状态,此外,也无法检测出运行时的错误。

  

  ● 非抢先式的多线程

  协同程序提供了一种协作式的多线程。每个程序都等于是一个线程。一对yield-resume可以将执行权在不同线程之间切换。然后,协同程序与常规的多线程的不同之处在于,协同程序是非抢先式的。就是说,当一个协同程序运行时,是无法从外部停止它的。只有当协同程序显式地要求挂起时(调用yield),它才会停止。对于有些应用而言,这没有问题,而对于另外一些应用则可能无法接受这种情况。当不存在抢先时,编程会简单许多。无须为同步的bug而抓狂,在程序中所有线程间的同步都是显式的,只需确保一个协同程序在它的临界区域之外调用yield即可。

  对于非抢先式的多线程来说,只要有一个线程调用了一个阻塞的(blocking)操作,整个程序在该操作完成前,都会停止下来。对于大多数应用程序来说,这种行为是无法接受的。这也导致了许多程序员放弃协同程序,转而使用传统的多线程。接下来会用一个有趣的方法来解决这个问题。

  先假设一个典型的多线程使用情况:希望通过HTTP下载几个远程的文件。当然,若要下载几个远程文件,就必须先知道如何下载一个远程文件。在本例中,将使用Diego Nehab开发的LuaSocket。为了下载一个文件,必须先打开一个到该站点的连接,然后发送下载文件的请求,并接收文件(数据块),最后关闭连接。在Lua中可以按以下步骤来完成这项任务。首先,加载LuaSocket库。

  1. require "socket"

  然后,定义主机和下载的文件。本例,将从World Wide Consortium(环球网协会)下载《HTML 3.2参考规范》

  1. host = "www.w3.org"
  2. file = "/TR/REC-html32.html"

  接下来,打开一个TCP连接,连接到该站点的80端口。

  1. c = assert(socket.connect(host, 80))

  这步操作将返回一个连接对象,可以用它来发送文件请求。

  1. c:send("GET" .. file .. "HTTP/1.0\r\n\r\n")

  下一步,按1K的字节块来接收文件,并将每块写到标准输出:

  1. while true do
  2. local s, status, partial = c:receive(2^10)
  3. io.write(s or partial)
  4. if status == "closed" then break end
  5. end

  在正常情况下receive函数会返回一个字符串。若发生错误,则会返回nil,并且附加错误代码(status)及出错前读取到的内容(partial)。当主机关闭连接时,就将其余接收到的内容打印出来,然后退出接收循环。

  下载完文件后,关闭连接。

  1. c:close()

  现在已经掌握了如何下载一个文件,那么再回到下载几个文件的问题上。最繁琐的做法是逐个地下载文件。因为,这种顺序的做法太慢了,它只能在下载完一个文件后才开始读取该文件。当接收一个远程文件时,程序将大部分的时间花费在等待数据接收上。更明确地说,是将时间用在了对receive阻塞调用上。因此,如果一个程序可以同时下载所有文件的话,那么它的运行速度就可以快很多了。当一个连接没有可用数据时,程序便可以从其他连接处读取数据。很明显协同程序提供了一种简便的方式来构建这种并发下载的结构。可以为每个下载任务创建一个新的线程,只要一个线程无可用数据,它就可以将控制权转让给一个简单的调度程序,而这个调度程序则会去调用其他的下载线程。

  在以协同程序来重写程序前,先将前面的下载代码重新写为一个函数。代码如下:

  1. function download(host, file)
  2. local c = assert(socket.connect(host, 80))
  3. local count = 0 -- 记录接收到的字节数
  4. c:send("GET " .. file .. "HTTP/1.0\r\n\r\n")
  5. while true do
  6. local s, status, partial = receive(c)
  7. count = count + #(s or partial)
  8. if status == "closed" then break end
  9. end
  10. c:close()
  11. print(file, count)
  12. end

  由于对远程文件的内容并不感兴趣,所以不需要将文件内容写到标准输出中,只需计算并打印出文件大小即可。在上述代码中,还使用了一个辅助函数receive来从连接接收数据。在顺序下载的方法中,receive的代码可以是这样的:

  1. function receive(connection)
  2. return connection:receive(2^10)
  3. end

  而在并发的实现中,这个函数在接收数据时绝对不能阻塞。因此,它需要在没有足够的可用数据时挂起执行。新代码如下:

  1. function receive(connection)
  2. connection.settimeout(0) -- 使recevie调用不会阻塞
  3. local s, status, partial = connection:receive(2^20)
  4. if status == "timeout" then
  5. coroutine.yield(connection)
  6. end
  7. return s or partial, status
  8. end

  对settimeout(0)的调用可使以后所有对此连接进行的操作不会阻塞。若一个操作返回的status为“timeout(超时)”,就表示该操作在返回时还未完成。此时,线程就会挂起执行。而以非假的参数来调用yield,可以告诉调度程序线程仍在执行任务中。注意,即使在超时的情况下,连接也是会返回已经读取到的内容,即记录在partial变量中的值。

  以下这段代码展示了调度程序及一些辅助代码。table threads为调度程序保存着所有正在运行中的线程。函数get确保每个下载任务都在一个独立的线程中执行。调度程序本身主要就是一个循环,它遍历所有的线程,逐个唤醒它们的执行。并且当线程完成任务时,将该线程从列表中删除。在所有线程都完成运行后,停止循环。

  1. threads = {} -- 用于记录所有正在运行的线程
  2. function get(host, file)
  3. -- 创建协同程序
  4. local co = coroutine.create(function()
  5. download(host, file)
  6. end)
  7. -- 将其插入记录表中
  8. table.insert(threads, co)
  9. end
  10. function dispatch()
  11. local i = 1
  12. while true do
  13. if threads[i] == nil then -- 还有线程吗?
  14. if threads[1] == nil then break end -- 列表是否为空?
  15. i = 1 -- 重新开始循环
  16. end
  17. local status, res = coroutine.resume(threads[i])
  18. if not res then -- 线程是否已经完成了任务?
  19. table.remove(threads, i)
  20. else
  21. i = i + 1
  22. end
  23. end
  24. end

  最后,主程序需要创建所有的线程,并调用调度程序。例如,若要下载W3C站点上的4个文件,主程序如下:

  1. host = "www.w3.org"
  2. get(host, "/TR/html401/html40.txt")
  3. get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
  4. get(host, "/TR/REC-html32.html")
  5. get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
  6. dispatch() -- 主循环

  通过协同程序,计算机只需要6秒便可下载完成这4个文件。但若使用顺序下载的话,则需要多耗费两倍的时间(15秒左右)。

  除了速度有所提高外,上面这个实现还不够完美。只要有一个线程在读取数据,就不会有问题。但若所有线程都没有数据可读,调度程序就会执行一个“忙碌等待(Busy Wait)”,不断地从一个线程切换到另一个线程,仅仅是为了检测是否还有数据可读。这样便导致了这个协同程序的实现会比顺序下载多耗费将近30倍的CPU时间。

  为了避免这样的情况,可以使用LuaSocket中的select函数。这个函数可以用于等待一组socket的状态改变,在等待时程序陷入阻塞(block)状态。若要在当前实现中应用这个函数,只需要修改调度程序即可,新版本如下:

  1. function dispatch()
  2. local i = 1
  3. local connections = {}
  4. while true do
  5. if threads[i] == nil then -- 还有线程吗?
  6. if threads[1] == nil then break end
  7. i = 1 -- 重新开始循环
  8. connections = {}
  9. end
  10. local status, res = coroutine.resume(threads[i])
  11. if not res then -- 线程是否已经完成了任务?
  12. table.remove(threads, i)
  13. else -- 超时
  14. i = i + 1
  15. connections[#connections + 1] = res
  16. if #connections == #threads then -- 所有线程都阻塞了吗?
  17. socket.select(connections)
  18. end
  19. end
  20. end
  21. end

  新的调度程序将所有超时的连接收集到一个名为connectionstable中。记住,receive会将超时的连接通过yield传递,也就是resume会返回它们。如果所有的连接都超时了,调度程序就调用select来等待这些连接的状态发生变化。这个最终版本的实现与上一个使用协同程序的实现一样快,另外由于它不会有“忙碌等待”,所以只比顺序下载耗费CPU资源略多而已。

?