tbox之前提供的stackfull协程库,虽然切换效率已经非常高了,但是由于每个协程都需要维护一个独立的堆栈, 内存空间利用率不是很高,在并发量非常大的时候,内存使用量会相当大。

之前考虑过采用stacksegment方式进行内存优化,实现动态增涨,但是这样对性能还是有一定的影响,暂时不去考虑了。

最近参考了下boost和protothreads的stackless协程实现,这种方式虽然易用性和灵活性上受到了很多限制,但是对切换效率和内存利用率的提升效果还是非常明显的。。

因此,我在tbox里面也加上了对stackless协程的支持,在切换原语上参考了protothreads的实现,接口封装上参考了boost的设计,使得更加可读易用

先晒段实际的接口使用代码:

  1. tb_lo_coroutine_enter(coroutine)
  2. {
  3. while (1)
  4. {
  5. tb_lo_coroutine_yield();
  6. }
  7. }

然后实测对比了下:

  1. * 切换性能在macosx上比tboxstackfull版本提升了5-6倍,1000w次切换只需要40ms
  2. * 每个协程的内存占用也减少到了只有固定几十个bytes

那么既然stackless的效率提升这么明显,stackfull模式还需要吗?可以比较下两者的优劣:

  • stackfull协程:易用性和灵活性非常高,但是内存使用过大
  • stackless协程:切换效率和内存利用率很高,更加轻量,但是使用上限制较多

由于stackless的实现比较轻量,占用资源也不是很多,因此tbox默认放置到了micro微内核模式下,作为基础模块,提供股嵌入式平台使用

而一般情况下,如果对资源使用和切换性能要求不是非常苛刻的话,使用stackfull的方式会更加方便,代码也更易于维护

具体如何选择,可根据实际使用场景,自己选择哦。。

切换

下面给的tbox的stackless协程切换实例,直观感受下:

  1. static tb_void_t switchtask(tb_lo_coroutine_ref_t coroutine, tb_cpointer_t priv)
  2. {
  3. // check
  4. tb_size_t* count = (tb_size_t*)priv;
  5.  
  6. // enter coroutine
  7. tb_lo_coroutine_enter(coroutine)
  8. {
  9. // loop
  10. while ((*count)--)
  11. {
  12. // trace
  13. tb_trace_i("[coroutine: %p]: %lu", tb_lo_coroutine_self(), *count);
  14.  
  15. // yield
  16. tb_lo_coroutine_yield();
  17. }
  18. }
  19. }
  20. static tb_void_t test()
  21. {
  22. // init scheduler
  23. tb_lo_scheduler_ref_t scheduler = tb_lo_scheduler_init();
  24. if (scheduler)
  25. {
  26. // start coroutines
  27. tb_size_t counts[] = {10, 10};
  28. tb_lo_coroutine_start(scheduler, switchtask, &counts[0], tb_null);
  29. tb_lo_coroutine_start(scheduler, switchtask, &counts[1], tb_null);
  30.  
  31. // run scheduler
  32. tb_lo_scheduler_loop(scheduler, tb_true);
  33.  
  34. // exit scheduler
  35. tb_lo_scheduler_exit(scheduler);
  36. }
  37. }

其实整体接口使用跟tbox的那套stackfull接口类似,并没有多少区别,但是相比stackfull还是有些限制的:

  1. 1. 目前只能支持在根函数进行协程切换和等待,嵌套协程不支持
  2. 2. 协程内部局部变量使用受限

对于限制1,我正在研究中,看看有没有好的实现方案,之前尝试过支持下,后来发现需要按栈结构分级保存每个入口的label地址,这样会占用更多内存,就放弃了。 对于限制2,由于stackless协程函数是需要重入的,因此目前只能在enter()块外部定以一些状态不变的变量,enter()块内部不要使用局部变量

接口设计上,这边采用boost的模式:

  1. // enter coroutine
  2. tb_lo_coroutine_enter(coroutine)
  3. {
  4. // yield
  5. tb_lo_coroutine_yield();
  6. }

这样比起protothreads的那种begin()和end(),更加可读和精简,接口也少了一个。。

参数传递

tb_lo_coroutine_start的最后两个参数,专门用来传递关联每个协程的私有数据priv和释放接口free,例如:

  1. typedef struct __tb_xxxx_priv_t
  2. {
  3. tb_size_t member;
  4. tb_size_t others;
  5.  
  6. }tb_xxxx_priv_t;
  7.  
  8. static tb_void_t tb_xxx_free(tb_cpointer_t priv)
  9. {
  10. if (priv) tb_free(priv);
  11. }
  12.  
  13. static tb_void_t test()
  14. {
  15. tb_xxxx_priv_t* priv = tb_malloc0_type(tb_xxxx_priv_t);
  16. if (priv)
  17. {
  18. priv->member = value;
  19. }
  20.  
  21. tb_lo_coroutine_start(scheduler, switchtask, priv, tb_xxx_free);
  22. }

上述例子,为协程分配一个私有的数据结构,用于数据状态的维护,解决不能操作局部变量的问题,但是这样写非常繁琐

tbox里面提供了一些辅助接口,用来简化这些流程:

  1. typedef struct __tb_xxxx_priv_t
  2. {
  3. tb_size_t member;
  4. tb_size_t others;
  5.  
  6. }tb_xxxx_priv_t;
  7.  
  8. static tb_void_t test()
  9. {
  10. // start coroutine
  11. tb_lo_coroutine_start(scheduler, switchtask, tb_lo_coroutine_pass1(tb_xxxx_priv_t, member, value));
  12. }

这个跟之前的代码功能上是等价的,这里利用tb_lo_coroutine_pass1宏接口,自动处理了之前的那些设置流程, 用来快速关联一个私有数据块给新协程。

挂起和恢复

这个跟stackfull的接口用法上也是一样的:

  1. tb_lo_coroutine_enter(coroutine)
  2. {
  3. // 挂起当前协程
  4. tb_lo_coroutine_suspend();
  5. }
  6.  
  7. // 恢复指定协程(这个可以不在协程函数内部使用,其他地方也可以调用)
  8. tb_lo_coroutine_resume(coroutine);

挂起和恢复跟yield的区别就是,yield后的协程,之后还会被切换回来,但是被挂起的协程,除非调用resume()恢复它,否则永远不会再被执行到。

等待

当然一般,我们不会直接使用suspend()和resume()接口,这两个比较原始,如果需要定时等待,可以使用:

  1. tb_lo_coroutine_enter(coroutine)
  2. {
  3. // 等待1s
  4. tb_lo_coroutine_sleep(1000);
  5. }

来挂起当前协程1s,之后会自动恢复执行,如果要进行io等待,可以使用:

  1. static tb_void_t tb_demo_lo_coroutine_client(tb_lo_coroutine_ref_t coroutine, tb_cpointer_t priv)
  2. {
  3. // check
  4. tb_demo_lo_client_ref_t client = (tb_demo_lo_client_ref_t)priv;
  5. tb_assert(client);
  6.  
  7. // enter coroutine
  8. tb_lo_coroutine_enter(coroutine)
  9. {
  10. // read data
  11. client->size = sizeof(client->data) - 1;
  12. while (client->read < client->size)
  13. {
  14. // read it
  15. client->real = tb_socket_recv(client->sock, (tb_byte_t*)client->data + client->read, client->size - client->read);
  16.  
  17. // has data?
  18. if (client->real > 0)
  19. {
  20. client->read += client->real;
  21. client->wait = 0;
  22. }
  23. // no data? wait it
  24. else if (!client->real && !client->wait)
  25. {
  26. // 等待socket数据
  27. tb_lo_coroutine_waitio(client->sock, TB_SOCKET_EVENT_RECV, TB_DEMO_TIMEOUT);
  28.  
  29. // 获取等到的io事件
  30. client->wait = tb_lo_coroutine_events();
  31. tb_assert_and_check_break(client->wait >= 0);
  32. }
  33. // failed or end?
  34. else break;
  35. }
  36.  
  37. // trace
  38. tb_trace_i("echo: %s", client->data);
  39.  
  40. // exit socket
  41. tb_socket_exit(client->sock);
  42. }
  43. }

这个跟stackfull模式除了局部变量的区别,其他使用上几乎一样,也是同步模式,但是实际上tbox已经在底层把它放入了poller轮询器中进行等待

在没有数据,调用tb_lo_coroutine_waitio进行socket等待事件后,tbox会自动启用stackless调度器内部的io调度器(默认是不启用的,延迟加载,减少无畏的资源浪费)

然后进行poll切换调度(内部根据不同平台使用epoll, kqueue, poll, 后续还会支持iocp)。

如果有事件到来,会将收到事件的所有协程恢复执行,当然也可以指定等待超时,超时返回或者强行kill中断掉。

tbox中内置了一个stackless版本的http_server,实现也是非常轻量,经测试效率还是非常高的, 整体表现比stackfull的实现更好。

更多stackless接口使用demo,可以参考tbox的源码

信号量和锁

这个就简单讲讲了,使用跟stackfull的类似,例如:

  1. // the lock
  2. static tb_lo_lock_t g_lock;
  3.  
  4. // enter coroutine
  5. tb_lo_coroutine_enter(coroutine)
  6. {
  7. // loop
  8. while (lock->count--)
  9. {
  10. // enter lock
  11. tb_lo_lock_enter(&g_lock);
  12.  
  13. // trace
  14. tb_trace_i("[coroutine: %p]: enter", tb_lo_coroutine_self());
  15.  
  16. // wait some time
  17. tb_lo_coroutine_sleep(1000);
  18.  
  19. // trace
  20. tb_trace_i("[coroutine: %p]: leave", tb_lo_coroutine_self());
  21.  
  22. // leave lock
  23. tb_lo_lock_leave(&g_lock);
  24. }
  25. }
  26.  
  27. // init lock
  28. tb_lo_lock_init(&g_lock);
  29.  
  30. // start coroutine
  31. // ..
  32.  
  33. // exit lock
  34. tb_lo_lock_exit(&g_lock);

这里只是举个例子,实际使用中尽量还是别这么直接用全局变量哦。。