事件等待模式,也就所谓的reactor模式,简单的讲就是通过监听等待io的事件,然后将等待就绪可以处理的事件,分发给对应的io处理程序进行处理,这种模式本身并不算是异步io,但是TBOX的asio库,在类unix系统上,也是基于此模型实现的,所以在讲解真正的异步io之前,首先稍微介绍介绍,对asio的底层机制有个大体了解。

    在类unix系统,例如linux的epoll,mac的kqueue,以及select、poll, dev/poll等等,都是可以用来实现reactor

    虽然功能大体相同,但是效率上来讲, epoll和kqueue 更加的高效,因为他们没有像select, poll那样通过轮询来实现

    单从接口设计上来说,kqueue的设计效率更高些,因为它可以一次批量处理多个事件,所以跟系统内核间的交互比较少。但是具体epoll和kqueue那个效率更高,就不好说了。

    TBOX下的asio底层实现,大体分为两种:

    1. 基于reactor模型,通过对epoll,kqueue、poll、select等api的分装实现的aiop接口(也就是本章所要讲的) 在一个独立的线程内,进行事件监听,来分发处理各种异步io事件,也就实现了proactor的模式。
    2. 直接系统原生支持的proactor模型,例如windows的iocp,并在其基础上做一些封装来实现。

    这样,对于上层应用来说,用的都是同一套asio的异步回调接口,不需要做什么的改变,但是底层根据不同平台其实现机制,已大不相同,windows上用了iocp,linux和android用了epoll,mac上用了kqueue,ios上用了poll。。

    言归正传,本章所要将的aiop接口,就是对reactor的各种poll接口的上层封装,对于一些不需要将就高性能的应用直接使用aiop更加的简洁,易维护。 而基于回调的proactor模式,等下一章再详细讲解。

    首先我先描述下几个对象类型:

    1. aiop: 事件等待对象池
    2. aioo: 等待对象,关联和维护:socket句柄、用户私有数据
    3. aioe: 事件对象,一个aioo对象一次可以等待多个不同类型的aioe事件对象,例如send, recv, acpt, conn, ...
    4. code: 事件代码

    接着我们看下直接使用aiop的简单服务器代码,(这里为了看起来简洁点,资源管理和释放上我就先省略了):

    1. tb_int_t main(tb_int_t argc, tb_char_t** argv)
    2. {
    3. // 初始化一个tcp的套接字,用于监听
    4. tb_socket_ref_t listen_sock = tb_socket_init(TB_SOCKET_TYPE_TCP);
    5. tb_assert_and_check_return_val(sock, 0);
    6.  
    7. // 初始化aiop轮询池,对象规模16个socket,可混用,如果传0,则使用默认值
    8. tb_aiop_ref_t aiop = tb_aiop_init(16);
    9. tb_assert_and_check_return_val(aiop, 0);
    10.  
    11. // 绑定ip和端口,这里ip没做绑定,传null,监听端口为9090
    12. if (!tb_socket_bind(listen_sock, tb_null, 9090)) return 0;
    13.  
    14. // 监听socket
    15. if (!tb_socket_listen(listen_sock, 5)) return 0;
    16.  
    17. // 将这个监听套接字添加到aiop中,并附上accept等待事件
    18. if (!tb_aiop_addo(aiop, listen_sock, TB_AIOE_CODE_ACPT, tb_null)) return 0;
    19.  
    20. // 初始化一个aioe的事件对象列表,用于获取等待返回的事件
    21. tb_aioe_t list[16];
    22.  
    23. // 开启循环
    24. while (1)
    25. {
    26. /* 等待事件到来,用到跟epoll,select类似
    27. *
    28. * 16: 最大需要等待的事件数量
    29. * -1: 等待超时值,这里为永久等待
    30. *
    31. * objn为返回的有效事件数, 如果失败返回:-1, 超时返回:0
    32. */
    33. tb_long_t objn = tb_aiop_wait(aiop, list, 16, -1);
    34. tb_assert_and_check_break(objn >= 0);
    35.  
    36. // 枚举等到的事件列表
    37. tb_size_t i = 0;
    38. for (i = 0; i < objn; i++)
    39. {
    40. // 获取事件对应aioo对象句柄,aioo对socket句柄,事件类型、关联的私有数据进行了同一维护
    41. tb_aioo_ref_t aioo = list[i].aioo;
    42.  
    43. // 获取aioe事件关联的私有数据指针
    44. // 也可以通过tb_aioo_priv(aioo)获取
    45. tb_cpointer_t priv = list[i].priv;
    46.  
    47. // 获取aioo对应的socket句柄
    48. tb_socket_ref_t sock = tb_aioo_sock(aioo);
    49.  
    50. // 有accept事件?
    51. if (list[i].code & TB_AIOE_CODE_ACPT)
    52. {
    53. // 接收对方的连接,返回对应的客户端socket
    54. tb_socket_ref_t client_sock = tb_socket_accept(sock, tb_null, tb_null);
    55. tb_assert_and_check_break(client_sock);
    56.  
    57. /* 将客户端的socket也添加到aiop池中,并等待它的recv事件
    58. *
    59. * 这里的最后一个参数,可以传一个私有的数据指针,并和sock进行关联
    60. * 用于方便维护每个连接对应的会话数据,这里随便传了个字符串
    61. *
    62. * 返回的aioo对象可以保存下来,并在之后可以灵活修改需要等待的事件
    63. *
    64. * 注:这里的client_sock需要在自己的应用内,自己做释放,aiop不会去自动释放
    65. * 因为这个实在外围代码自己创建的,这里的例子仅仅为了省事,就直接忽略了。
    66. */
    67. tb_aioo_ref_t aioo = tb_aiop_addo(aiop, client_sock, TB_AIOE_CODE_RECV, "private data");
    68. tb_assert_and_check_break(aioo);
    69. }
    70. // 有recv事件?
    71. else if (list[i].code & TB_AIOE_CODE_RECV)
    72. {
    73. // 非阻塞接收一段数据
    74. tb_byte_t data[8192];
    75. tb_long_t real = tb_socket_recv(sock, data, sizeof(data));
    76. // 接收完指定数据后,省略部分代码
    77. // ...
    78. // 将socket改为等待发送
    79. if (!tb_aiop_sete(aiop, aioo, TB_AIOE_CODE_SEND, tb_null)) break;
    80.  
    81. // 尝试发送数据,也许没发完
    82. tb_socket_send(sock, "hello", sizeof("hello"));
    83. }
    84. // 有send事件?
    85. else if (list[i].code & TB_AIOE_CODE_SEND)
    86. {
    87. // 继续发送上回没发完的数据
    88. // tb_socket_send(..);
    89.  
    90. // 删除对应aioo事件对象,取消监听这个sock的事件
    91. tb_aiop_delo(aiop, aioo);
    92. }
    93. // 有连接事件?
    94. else if (list[i].code & TB_AIOE_CODE_CONN)
    95. {
    96. // 此处不会进入,仅仅在tb_socket_connect之后,并且注册了aioo,才会有此事件
    97. // ...
    98. }
    99. // 错误代码处理
    100. else
    101. {
    102. tb_trace_e("unknown code: %lu",list[i].code);
    103. break;
    104. }
    105. }
    106. }
    107. return 0;
    108. }

    这里的代码,仅仅描述了下aiop的接口调用流程,没有实际的服务器业务逻辑,仅作参考,请不要照搬复制的使用。

    代码里提到的aioo对象,还有个单独的等待接口,用于直接等待单个socket对象,一般在basic_stream的wait里面使用:

    1. /*! 等待单个socket句柄的事件
    2. *
    3. * @param socket socket句柄
    4. * @param code aioe等待事件代码
    5. * @param timeout 等待超时时间,永久等待传:-1
    6. *
    7. * @return > 0: 等到的事件, 0: 超时, -1: 失败
    8. */
    9. tb_long_t tb_aioo_wait(tb_socket_ref_t socket, tb_size_t code, tb_long_t timeout);

    aiop的接口并不多,我这里不多做描述了,就简单的列举下吧:

    1. /*! 初始化aiop事件等待池
    2. *
    3. * @param maxn 等待对象的规模数量
    4. *
    5. * @return aiop池
    6. */
    7. tb_aiop_ref_t tb_aiop_init(tb_size_t maxn);
    8.  
    9. /*! 退出aiop
    10. *
    11. * @param aiop aiop池
    12. */
    13. tb_void_t tb_aiop_exit(tb_aiop_ref_t aiop);
    14.  
    15. /*! 清除所有aiop中的aioo等待对象
    16. *
    17. * @param aiop aiop池
    18. */
    19. tb_void_t tb_aiop_cler(tb_aiop_ref_t aiop);
    20.  
    21. /*! 强制退出aiop的等待,tb_aiop_wait会返回:-1,并且退出循环,无法再次等待
    22. *
    23. * @param aiop aiop池
    24. */
    25. tb_void_t tb_aiop_kill(tb_aiop_ref_t aiop);
    26.  
    27. /*! 退出aiop的等待,tb_aiop_wait会返回:0,但不退出等待循环,还可继续等待
    28. *
    29. * @param aiop aiop池
    30. */
    31. tb_void_t tb_aiop_spak(tb_aiop_ref_t aiop);
    32.  
    33. /*! 添加一个socket等待对象aioo,并且关联等待的事件代码code,以及私有数据priv
    34. *
    35. * @param aiop aiop池
    36. * @param socket socket句柄
    37. * @param code 等待的事件代码
    38. * @param priv 关联的用户私有数据
    39. *
    40. * @return aioo对象
    41. */
    42. tb_aioo_ref_t tb_aiop_addo(tb_aiop_ref_t aiop, tb_socket_ref_t socket, tb_size_t code, tb_cpointer_t priv);
    43.  
    44. /*! 删除一个aioo等待对象,永远不再等待它
    45. *
    46. * @param aiop aiop池
    47. * @param aioo aioo对象句柄
    48. *
    49. */
    50. tb_void_t tb_aiop_delo(tb_aiop_ref_t aiop, tb_aioo_ref_t aioo);
    51.  
    52. /*! 投递一个aioe等待事件对象
    53. *
    54. * @param aiop aiop池
    55. * @param aioe aioe事件对象
    56. *
    57. * @return 投递成功返回:tb_true, 失败返回:tb_false
    58. */
    59. tb_bool_t tb_aiop_post(tb_aiop_ref_t aiop, tb_aioe_t const* aioe);
    60.  
    61. /*! 设置和修改aioo对象的等待事件
    62. *
    63. * @param aiop aiop池
    64. * @param aioo aioo对象
    65. * @param code 等待事件代码
    66. * @param priv 用户私有数据
    67. *
    68. * @return 成功返回:tb_true, 失败返回:tb_false
    69. */
    70. tb_bool_t tb_aiop_sete(tb_aiop_ref_t aiop, tb_aioo_ref_t aioo, tb_size_t code, tb_cpointer_t priv);
    71.  
    72. /*! 等待一定量的aioe事件
    73. *
    74. * @param aiop aiop池
    75. * @param list aioe事件列表,用于保存成功返回的事件对象
    76. * @param maxn 指定等待多少的事件
    77. * @param timeout 等待超时值,永久等待:-1
    78. *
    79. * @return > 0: 实际等到的aioe事件对象数, 0: 超时, -1: 失败
    80. */
    81. tb_long_t tb_aiop_wait(tb_aiop_ref_t aiop, tb_aioe_t* list, tb_size_t maxn, tb_long_t timeout);

    虽然aiop的reactor模式处理并发io已经相当方便了,但是由于系统底层对其的实现的程度不一,像windows上只能通过select来实现,因此对于一些高性能并发io的处理上,还是有些力不从心的,为了更好的利用不同系统提供的io处理特性,达到更好的并发性,又要保证上层调用接口的简单、统一,实现跨平台、高度可移植性。

    这个时候用proactor模式进行更上层的接口封装,完全采用异步回调通知模式,是一种较好地解决方案。