相对于asio事件等待池aiop的reactor模式,基于proactor模式的事件回调池aicp封装的更加的上层。

    1. 在类unix系统上

      底层是基于aiop的实现,在一个线程里面进行事件等待,然后分发所有事件,在worker中处理完后调用回调返回到上层。 并对不同系统的aiop支持力度,进行针对性优化。

      • 如果aiop支持边缘触发(例如:epoll、kqueue),尽量启用边缘触发,以减少系统api的频繁调用。
      • 使用sendfile对发送文件进行优化
      • 针对linux系统,启用native file io,实现file的dma读写,节省cpu的时间。(目前还没实现)
    2. 在windows系统上

      直接基于iocp进行封装,并针对windows的一些扩展socket api进行针对性优化。

      • 使用DisconnectEx对socket的进行重用,实现socket池来管理,优化整体性能
      • 使用TransmitFile对发送文件进行优化
      • 在高版本系统上,优先使用GetQueuedCompletionStatusEx来批量获取事件,减少系统调用

    不管在哪个平台上,只需要两个线程(worker线程+timer线程),就能实现高性能并发io读写。

    其中timer线程主要处理投递事件的超时维护,如果io事件长时间不响应,则会超时取消,超时事件是可以根据不同事件,自己设置。 如果确实想要快速取消,也有接口安全的强制kill掉,并在对应的回调里面监听到killed事件。

    针对timer,aicp中了两种定时器:

    1. tb_ltimer_t:低精度定时器,精度为1s,主要用于超时维护,效率很高,采用简化的timing-wheel算法
    2. tb_timer_t:通用高精度定时器,精度为1ms,使用最小堆维护,主要用于维护所有定时任务,以及各种io限速操作

    并且整个aicp参考nginx对于gettimeofday的优化,也对其进行了缓存,只在master loop中进行更新,其他地方直接使用缓存的值。

    虽然aicp只需要一个worker线程进行loop就足够了,但是同时它也支持多个线程同时开启loop,来提高分发处理效率。

    具体的aicp架构,参见:asio架构

    下面看看aicp是如何使用的:

    所有aicp事件投递,都是由aico对象完成,每个aico对象一个事件对象,目前支持三种事件对象:

    1. socket
    2. file
    3. task(用于投递定时任务)

    具体aico的事件投递接口,参见:asio的事件投递接口说明

    1. /* //////////////////////////////////////////////////////////////////////////////////////
    2. * includes
    3. */
    4. #include "tbox/tbox.h"
    5.  
    6. /* //////////////////////////////////////////////////////////////////////////////////////
    7. * types
    8. */
    9. typedef struct __tb_demo_context_t
    10. {
    11. // the file
    12. tb_file_ref_t file;
    13.  
    14. // the aico
    15. tb_aico_ref_t aico;
    16.  
    17. // the size
    18. tb_hize_t size;
    19.  
    20. }tb_demo_context_t;
    21.  
    22. /* //////////////////////////////////////////////////////////////////////////////////////
    23. * implementation
    24. */
    25. // aico对象关闭回调函数
    26. static tb_bool_t tb_demo_aico_clos(tb_aice_t const* aice)
    27. {
    28. // check
    29. tb_assert_and_check_return_val(aice && aice->aico && aice->code == TB_AICE_CODE_CLOS, tb_false);
    30.  
    31. // trace
    32. tb_trace_d("aico[%p]: clos: %s", aice->aico, tb_state_cstr(aice->state));
    33.  
    34. // exit aico
    35. tb_aico_exit(aice->aico);
    36.  
    37. // ok
    38. return tb_true;
    39. }
    40. static tb_void_t tb_demo_context_exit(tb_demo_context_t* context)
    41. {
    42. if (context)
    43. {
    44. // clos aico
    45. if (context->aico) tb_aico_clos(context->aico, tb_demo_aico_clos, tb_null);
    46. context->aico = tb_null;
    47.  
    48. // exit
    49. tb_free(context);
    50. }
    51. }
    52. // 发送文件事件回调函数
    53. static tb_bool_t tb_demo_sock_sendf_func(tb_aice_t const* aice)
    54. {
    55. // check
    56. tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_SENDF, tb_false);
    57.  
    58. // the context
    59. tb_demo_context_t* context = (tb_demo_context_t*)aice->priv;
    60. tb_assert_and_check_return_val(context, tb_false);
    61.  
    62. // ok?
    63. if (aice->state == TB_STATE_OK)
    64. {
    65. // trace
    66. tb_trace_d("sendf[%p]: real: %lu, size: %llu", aice->aico, aice->u.sendf.real, aice->u.sendf.size);
    67.  
    68. // save size
    69. context->size += aice->u.sendf.real;
    70.  
    71. // 如果还没有发送完,则继续发送剩余文件数据
    72. if (aice->u.sendf.real < aice->u.sendf.size)
    73. {
    74. // post sendf from file
    75. if (!tb_aico_sendf(aice->aico, context->file, context->size, aice->u.sendf.size - aice->u.sendf.real, tb_demo_sock_sendf_func, context)) return tb_false;
    76. }
    77. else
    78. {
    79. tb_trace_i("sendf[%p]: finished", aice->aico);
    80. tb_demo_context_exit(context);
    81. }
    82. }
    83. // closed or failed?
    84. else
    85. {
    86. tb_trace_i("sendf[%p]: state: %s", aice->aico, tb_state_cstr(aice->state));
    87. tb_demo_context_exit(context);
    88. }
    89.  
    90. // ok
    91. return tb_true;
    92. }
    93. // accept事件回调函数
    94. static tb_bool_t tb_demo_sock_acpt_func(tb_aice_t const* aice)
    95. {
    96. // check
    97. tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_ACPT, tb_false);
    98.  
    99. // the file path
    100. tb_char_t const* path = (tb_char_t const*)aice->priv;
    101. tb_assert_and_check_return_val(path, tb_false);
    102.  
    103. // the aicp
    104. tb_aicp_ref_t aicp = tb_aico_aicp(aice->aico);
    105. tb_assert_and_check_return_val(aicp, tb_false);
    106.  
    107. // acpt ok?
    108. if (aice->state == TB_STATE_OK)
    109. {
    110. // trace
    111. tb_trace_i("acpt[%p]: %p", aice->aico, aice->u.acpt.aico);
    112.  
    113. // done
    114. tb_bool_t ok = tb_false;
    115. tb_demo_context_t* context = tb_null;
    116. do
    117. {
    118. // make context
    119. context = tb_malloc0_type(tb_demo_context_t);
    120. tb_assert_and_check_break(context);
    121.  
    122. // init file
    123. context->file = tb_file_init(path, TB_FILE_MODE_RO | TB_FILE_MODE_ASIO);
    124. tb_assert_and_check_break(context->file);
    125.  
    126. // 获取客户端连接的aico对象,用于发送数据
    127. context->aico = aice->u.acpt.aico;
    128. tb_assert_and_check_break(context->aico);
    129.  
    130. // 投递一个发送文件事件
    131. if (!tb_aico_sendf(context->aico, context->file, 0ULL, tb_file_size(context->file), tb_demo_sock_sendf_func, context)) break;
    132.  
    133. // ok
    134. ok = tb_true;
    135.  
    136. } while (0);
    137.  
    138. // failed?
    139. if (!ok)
    140. {
    141. // exit context
    142. if (context) tb_demo_context_exit(context);
    143. }
    144. }
    145. // failed?
    146. else
    147. {
    148. // exit loop
    149. tb_trace_i("acpt[%p]: state: %s", aice->aico, tb_state_cstr(aice->state));
    150.  
    151. // accept失败,关闭用于监听的aico对象
    152. if (aice->aico) tb_aico_clos(aice->aico, tb_demo_aico_clos, tb_null);
    153. }
    154.  
    155. // ok
    156. return tb_true;
    157. }
    158.  
    159. /* //////////////////////////////////////////////////////////////////////////////////////
    160. * main
    161. */
    162. tb_int_t main(tb_int_t argc, tb_char_t** argv)
    163. {
    164. // check
    165. tb_assert_and_check_return_val(argv[1], 0);
    166.  
    167. // done
    168. tb_aico_ref_t aico = tb_null;
    169. do
    170. {
    171. // 初始化tbox库
    172. if (!tb_init(tb_null, tb_null, 0)) break;
    173.  
    174. /* 初始化一个用于监听的aico对象
    175. *
    176. * 注:这里为了简化代码,直接使用了全局的tb_aicp()对象,
    177. * 全局的aicp对象,用起来更加方便,内部回去自己开一个线程运行loop来驱动
    178. * 一般用于客户端应用。
    179. *
    180. * 如果想要更高的并发性能,启用更多的线程去驱动loop,需要手动调用tb_aicp_init,指定需要的并发量,来创建。
    181. * 并且需要手动运行tb_aicp_loop
    182. */
    183. aico = tb_aico_init(tb_aicp());
    184. tb_assert_and_check_break(aico);
    185.  
    186. // 打开这个aico对象,并为其创建一个tcp socket
    187. if (!tb_aico_open_sock_from_type(aico, TB_SOCKET_TYPE_TCP)) break;
    188.  
    189. // 绑定监听端口
    190. if (!tb_socket_bind(tb_aico_sock(aico), tb_null, 9090)) break;
    191.  
    192. // 设置监听
    193. if (!tb_socket_listen(tb_aico_sock(aico), 20)) break;
    194.  
    195. /* 投递accept事件,仅需一次投递
    196. *
    197. * 注:
    198. * 只有accept事件是一次投递,长期有效,只要有事件过来,就回去调用对应的回调函数
    199. * 不需要重复投递,这样设计也是为了尽可能的接受更多地并发连接
    200. */
    201. if (!tb_aico_acpt(aico, tb_demo_sock_acpt_func, argv[1])) break;
    202.  
    203. // 等待
    204. getchar();
    205.  
    206. } while (0);
    207.  
    208. // exit tbox
    209. tb_exit();
    210. return 0;
    211. }