通信会话AioSession

​ AioQuickServer和AioQuickClient在smart-socket中负责的是服务的配置、启动、停止,所以代码逻辑较简单。AioSession才是smart-socket真正的灵魂,它是衔接网络传输与业务应用的纽带。在AioSession的协调控制下,用户无需再去关心并发所带来的复杂IO场景,只需专注于数据编解码与业务处理。

​ AioSession我们只讲解跟用户息息相关的部分,其余部分读者可自行去阅读源码。AIO通信的关键两个环节为:读回调、写回调,smart-socket对这两个事件的处理都在AioSession中实现。随着版本的迭代,读者在本文看到的代码可能与实际项目源码中看到的可能略有差异,但主体结构是一致的。我们先来看一下读回调的处理过程:

  • 调用flip方法将接收数据的缓冲区切换至读模式。
  • 执行ioServerConfig.getProtocol().decode()进行数据解析。若无法解析出一个完整的消息则返回null,结束本次解析操作;若在解析过程中出现异常会触发状态机PROCESS_EXCEPTION,在这个环节出现异常通常是因为接收到了非法数据,此类问题的常见处理方式为关闭当前网络。
  • 如果成功解析出一个业务消息,会执行消息处理器messageProcessor.process(this, dataEntry)。这个阶段出现的运行时异常并不影响原网络传输中的数据有效性,所以smart-socket对异常进行了捕获并依旧以状态机PROCESS_EXCEPTION的形式反馈给用户。
  • 完成业务处理后若缓冲区中还存留未解析的数据,会再次执行解码(步骤2)、业务处理(步骤3)操作。
  • byteBuf中存放了业务处理后待发送的数据,执行flush进行数据输出。
  • 监测当前AioSession状态,如若已不可用则结束。
  • 若AioSession依旧处于可用状态,则切换缓冲区至写状态,并触发读操作continueRead
  1. void readFromChannel(boolean eof) {
  2. final ByteBuffer readBuffer = this.readBuffer.buffer();
  3. readBuffer.flip();
  4. final MessageProcessor<T> messageProcessor = ioServerConfig.getProcessor();
  5. while (readBuffer.hasRemaining()) {
  6. T dataEntry = null;
  7. try {
  8. dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this);
  9. } catch (Exception e) {
  10. messageProcessor.stateEvent(this, StateMachineEnum.DECODE_EXCEPTION, e);
  11. throw e;
  12. }
  13. if (dataEntry == null) {
  14. break;
  15. }
  16. //处理消息
  17. try {
  18. messageProcessor.process(this, dataEntry);
  19. } catch (Exception e) {
  20. messageProcessor.stateEvent(this, StateMachineEnum.PROCESS_EXCEPTION, e);
  21. }
  22. }
  23. if (byteBuf != null && !byteBuf.isClosed()) {
  24. byteBuf.flush();
  25. }
  26. if (eof || status == SESSION_STATUS_CLOSING) {
  27. close(false);
  28. messageProcessor.stateEvent(this, StateMachineEnum.INPUT_SHUTDOWN, null);
  29. return;
  30. }
  31. if (status == SESSION_STATUS_CLOSED) {
  32. return;
  33. }
  34. //数据读取完毕
  35. if (readBuffer.remaining() == 0) {
  36. readBuffer.clear();
  37. } else if (readBuffer.position() > 0) {
  38. // 仅当发生数据读取时调用compact,减少内存拷贝
  39. readBuffer.compact();
  40. } else {
  41. readBuffer.position(readBuffer.limit());
  42. readBuffer.limit(readBuffer.capacity());
  43. }
  44. continueRead();
  45. }

写操作的回调代码实现相对简单很多,但事实上却是处理难度最大的。因为写操作是个主动行为,我们不可预知用户在何时会写入数据,而写缓冲区中积压的数据又要依靠smart-socket将其输出到网络对端,故此处就存在并发的情况。如果出现多个线程同时触发write操作会导致WritePendingException,在AioSession中使用了信号量Semaphore完成了同步控制,实现了数据有序的输出。smart-socket的写回调会执行writeToChannel方法,而这信号量在之前已经通过其他途径被锁定,此处暂且不提。回调的处理逻辑为:

  • 识别writeBuffer或缓冲集合bufList中是否存在待输出的数据,若有则继续执行写操作continueWrite()
  • 如果数据已经输出完毕,则释放信号量。
  • 如果当前AioSession处于不可用状态,则关闭当前会话。
  • 由于并发的因素可能在释放信号量之后又有数据被写进来,且会话依旧处于可用状态,则当前线程会去竞争信号量资源,并在成功获取到信号量后执行数据输出。
  1. void writeToChannel() {
  2. if (writeBuffer == null) {
  3. writeBuffer = byteBuf.bufList.poll();
  4. } else if (!writeBuffer.buffer().hasRemaining()) {
  5. writeBuffer.clean();
  6. writeBuffer = byteBuf.bufList.poll();
  7. }
  8. if (writeBuffer != null) {
  9. continueWrite(writeBuffer);
  10. return;
  11. }
  12. semaphore.release();
  13. //此时可能是Closing或Closed状态
  14. if (status != SESSION_STATUS_ENABLED) {
  15. close();
  16. } else if (!byteBuf.isClosed()) {
  17. //也许此时有新的消息通过write方法添加到writeCacheQueue中
  18. byteBuf.flush();
  19. }
  20. }

若想了解smart-socket完整的写操作处理逻辑,请阅读源码WriteBuffer.java