tbox中提供了常用的一些stream模块,例如:data、file、http、sock等,可以通过指定不同的url,使用相同的接口 进行数据流的读写,非常的方便。

    例如:

    1. // init stream
    2. tb_stream_ref_t stream = tb_stream_init_from_url("http://www.xxx.com/file.txt");
    3. if (stream)
    4. {
    5. // open stream
    6. if (tb_stream_open(stream))
    7. {
    8. // read line
    9. tb_long_t size = 0;
    10. tb_char_t line[TB_STREAM_BLOCK_MAXN];
    11. while ((size = tb_stream_bread_line(stream, line, sizeof(line))) >= 0)
    12. {
    13. // trace
    14. tb_trace_i("line: %s", line);
    15. }
    16. }
    17. // exit stream
    18. tb_stream_exit(stream);
    19. }

    这样的好处是,操作io的模块不需要关心实际的数据流协议,只管从stream中读写数据就行了,实现数据和业务逻辑的解耦。。

    但是tbox提供的这些内置stream模块,有时候没法完全咱们的实际需求,例如:

    我想读取一个实时数据流的缓存队列,这个数据流一段会不停的送入数据进来,另外一段会不停的读取数据,如果数据不够,就会进入等待

    这其实是个很有用的功能,我的很多需求都会用到,例如:流媒体的一些实时数据获取和复用等等。。

    那如何实现这样一个stream模块,让tbox的stream接口支持呢,我们只要实现一个自定义的流模块就好,实现起来也不复杂

    我们先定义个一个stream类型,例如:

    1. // 用户自定义流类型:实时流
    2. #define TB_STREAM_TYPE_REAL (TB_STREAM_TYPE_USER + 1)
    3. // 定义一个控制流代码,之后tb_stream_ctrl需要
    4. #define TM_STREAM_CTRL_REAL_PUSH TB_STREAM_CTRL(TM_STREAM_TYPE_REAL, 1)

    定义个自定义流的数据结构,用于维护咱们的私有数据

    1. // 实时流类型
    2. typedef struct __tb_stream_real_t
    3. {
    4. // 这里定义了一个数据块buffer的队列,用于缓存不断送入的数据
    5. tb_queue_ref_t buffers;
    6. // 总的数据大小
    7. tb_size_t size;
    8. }tb_stream_real_t, *tb_stream_real_ref_t;
    9. // 定义一个buffer块类型,用于维护单个数据块
    10. typedef struct __tm_real_buffer_t
    11. {
    12. // 数据地址
    13. tb_byte_t* data;
    14. // 这个buffer总大小
    15. tb_size_t size;
    16. // 在这个buffer中,当前读取到的数据
    17. tb_size_t read;
    18. }tm_real_buffer_t, *tm_real_buffer_ref_t;

    创建一个stream实例,注册一些需要的回调接口

    1. // 初始化创建个一个实时流
    2. tb_stream_ref_t tb_stream_init_real()
    3. {
    4. return tb_stream_init( TB_STREAM_TYPE_REAL
    5. , sizeof(tb_stream_real_t)
    6. , 0 // stream缓存大小(file/sock有用),这里禁用了,因为咱们的流不需要缓存读取
    7. , tb_stream_real_open
    8. , tb_stream_real_clos
    9. , tb_stream_real_exit
    10. , tb_stream_real_ctrl
    11. , tb_stream_real_wait
    12. , tb_stream_real_read
    13. , tb_null // 写回调,这里不需要
    14. , tb_null // seek,我们这里不需要
    15. , tb_null // 刷新写数据,不需要
    16. , tb_null); // kill当前的stream,很少用,一般用于中断内部读写
    17. }

    下面就是具体的回调接口实现了

    1. // 实现open回调接口,用于打开stream,tb_stream_open会用到
    2. static tb_bool_t tb_stream_real_open(tb_stream_ref_t stream)
    3. {
    4. // check
    5. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    6. tb_assert_and_check_return_val(rstream, tb_false);
    7. // 初始化一个buffer队列,并注册自动释放接口:tb_real_buffer_exit,之后有说明
    8. rstream->buffers = tb_queue_init(0, tb_element_mem(sizeof(tb_real_buffer_t), tb_real_buffer_exit, tb_null));
    9. // init size
    10. rstream->size = 0;
    11. // ok
    12. return !!rstream->buffers;
    13. }
    14. // 实现close回调接口,用于关闭stream,tb_stream_clos会用到
    15. static tb_bool_t tb_stream_real_clos(tb_stream_ref_t stream)
    16. {
    17. // check
    18. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    19. tb_assert_and_check_return_val(rstream, tb_false);
    20. // exit buffers
    21. if (rstream->buffers) tb_queue_exit(rstream->buffers);
    22. rstream->buffers = tb_null;
    23. // ok
    24. return tb_true;
    25. }
    26. // 实现exit回调接口,用于销毁stream,tb_stream_exit会用到
    27. static tb_void_t tb_stream_real_exit(tb_stream_ref_t stream)
    28. {
    29. // check
    30. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    31. tb_assert_and_check_return(rstream);
    32. // exit buffers
    33. if (rstream->buffers) tb_queue_exit(rstream->buffers);
    34. rstream->buffers = tb_null;
    35. // clear size
    36. rstream->size = 0;
    37. }
    38. // 实现read回调接口,用于读取数据,tb_stream_read/tb_stream_bread等接口会用到
    39. static tb_long_t tb_stream_real_read(tb_stream_ref_t stream, tb_byte_t* data, tb_size_t size)
    40. {
    41. // check
    42. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    43. tb_assert_and_check_return_val(rstream && rstream->buffers, -1);
    44. // check
    45. tb_check_return_val(data, -1);
    46. tb_check_return_val(size, 0);
    47. // 依次从队列头部读取每块buffer的数据,直到读满为止
    48. tb_long_t read = 0;
    49. while (read < size && tb_queue_size(rstream->buffers))
    50. {
    51. // get buffer
    52. tb_real_buffer_ref_t buffer = tb_queue_get(rstream->buffers);
    53. tb_assert_and_check_break(buffer && buffer->data && buffer->size);
    54. // read data
    55. if (buffer->read < buffer->size)
    56. {
    57. // calculate the need size
    58. tb_size_t need = tb_min(size - read, buffer->size - buffer->read);
    59. // copy data
    60. tb_memcpy(data + read, buffer->data + buffer->read, need);
    61. // update the read size for buffer
    62. buffer->read += need;
    63. // update the total read size
    64. read += need;
    65. }
    66. // 将读空的buffer释放掉
    67. if (buffer->read == buffer->size)
    68. tb_queue_pop(rstream->buffers);
    69. }
    70. // ok?
    71. return read;
    72. }
    73. // 实现wait回调接口,用于等待数据,tb_stream_wait/tb_stream_bread等阻塞读取接口会用到
    74. static tb_long_t tb_stream_real_wait(tb_stream_ref_t stream, tb_size_t wait, tb_long_t timeout)
    75. {
    76. // check
    77. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    78. tb_assert_and_check_return_val(rstream && rstream->buffers, -1);
    79. // 当前是否有数据可读?
    80. return tb_queue_size(rstream->buffers)? TB_STREAM_WAIT_READ : TB_STREAM_WAIT_NONE;
    81. }
    82. // 实现ctrl回调接口,用于设置和获取一些状态,扩展一些自定义的接口,tb_stream_ctrl接口会用到
    83. static tb_bool_t tb_stream_real_ctrl(tb_stream_ref_t stream, tb_size_t ctrl, tb_va_list_t args)
    84. {
    85. // check
    86. tb_stream_real_ref_t rstream = (tb_stream_real_ref_t)stream;
    87. tb_assert_and_check_return_val(rstream, tb_false);
    88. // ctrl
    89. switch (ctrl)
    90. {
    91. case TB_STREAM_CTRL_GET_SIZE:
    92. {
    93. // the psize
    94. tb_hong_t* psize = (tb_hong_t*)tb_va_arg(args, tb_hong_t*);
    95. tb_assert_and_check_break(psize);
    96. // 获取数据流大小,tb_stream_size有用到
    97. *psize = rstream->size;
    98. // ok
    99. return tb_true;
    100. }
    101. // 在另外一端通过tb_stream_ctrl来不断的送入数据块到stream
    102. case TB_STREAM_CTRL_REAL_PUSH:
    103. {
    104. // check
    105. tb_assert_and_check_break(rstream->buffers);
    106. // the data and size
    107. tb_byte_t const* data = (tb_byte_t const*)tb_va_arg(args, tb_byte_t const*);
    108. tb_size_t size = (tb_size_t)tb_va_arg(args, tb_size_t);
    109. tb_assert_and_check_break(data && size);
    110. // 压入一个数据块
    111. tb_real_buffer_t buffer;
    112. buffer.data = tb_memdup(data, size);
    113. buffer.size = size;
    114. buffer.read = 0;
    115. tb_queue_put(rstream->buffers, &buffer);
    116. // 更新总的数据大小
    117. rstream->size += size;
    118. // ok
    119. return tb_true;
    120. }
    121. default:
    122. break;
    123. }
    124. // failed
    125. return tb_false;
    126. }

    通过上面四步, 基本上一个自定义流就实现好了,上面说的tb_real_buffer_exit主要用于queue维护的buffer的自动释放 详细说明和使用见容器章节,下面附属相关实现:

    1. static tb_void_t tb_real_buffer_exit(tb_element_ref_t element, tb_pointer_t buff)
    2. {
    3. // check
    4. tb_real_buffer_ref_t buffer = (tb_real_buffer_ref_t)buff;
    5. tb_assert_and_check_return(buffer);
    6. // exit it
    7. if (buffer->data) tb_free(buffer->data);
    8. buffer->data = tb_null;
    9. buffer->size = 0;
    10. buffer->read = 0;
    11. }

    最后,贴下咱们这个自定义stream使用:

    接收端

    1. // init stream
    2. tb_stream_ref_t stream = tb_stream_init_real();
    3. if (stream)
    4. {
    5. // open stream
    6. if (tb_stream_open(stream))
    7. {
    8. // read line
    9. tb_long_t size = 0;
    10. tb_char_t line[TB_STREAM_BLOCK_MAXN];
    11. while ((size = tb_stream_bread_line(stream, line, sizeof(line))) >= 0)
    12. {
    13. // trace
    14. tb_trace_i("line: %s", line);
    15. }
    16. }
    17. // exit stream
    18. tb_stream_exit(stream);
    19. }

    基本上没什么变化,就是换了下stream的初始化创建接口

    输入端

    1. // 将数据不停的送入stream中
    2. while (1)
    3. {
    4. // fill data
    5. tb_byte_t data[8192];
    6. tb_memset(data, 0xff, sizeof(data));
    7. // push data
    8. tb_stream_ctrl(stream, TB_STREAM_CTRL_REAL_PUSH, data, sizeof(data));
    9. }

    上面介绍的实现和使用方式,只是个例子,方便理解tbox中stream的机制,具体实现和使用还是需要根据自己的实际需求做调整。

    更详细的使用和扩展,可参考源代码来了解。。