服务端AioQuickServer

异步非阻塞通信的服务端实现。这个类主要是对JDK提供的AIO通信类AsynchronousServerSocketChannel、AsynchronousChannelGroup进行封装。AioQuickServer是服务端通信的调度中心,在完成协议、消息处理器的定义后,需要通过AioQuickServer来启动我们的通信服务。AioQuickServer提供了一些必要的参数配置接口,方便开发人员进行资源分配以达到最优效果。

2.5.1 成员属性

属性名类型说明
serverSocketChannelAsynchronousServerSocketChannelJDK提供的AIO服务端核心类
asynchronousChannelGroupAsynchronousChannelGroupJDK为AIO提供的线程池服务
configIoServerConfig存储AioQuickServer服务配置项
aioReadCompletionHandlerReadCompletionHandlersmart-socket提供的IO读回调处理类
aioWriteCompletionHandlerWriteCompletionHandlersmart-socket提供的IO写回调处理类
bufferPoolBufferPagePool内存池对象
workerExecutorServiceThreadPoolExecutorWorker线程池

2.5.2 配置接口

方法说明
public AioQuickServer setBannerEnabled(boolean bannerEnabled)服务启动时是否打印smart-socket banner
public AioQuickServer setThreadNum(int num)Server服务线程数
public AioQuickServer setReadBufferSize(int size)设置AioSession读缓存区长度
public AioQuickServer setOption(SocketOption socketOption, V value)设置Socket的TCP参数配置

2.5.3 核心方法

2.5.3.1 start:启动AIO服务端

  • 片段一
  1. asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(config.getThreadNum(), new ThreadFactory() {
  2. byte index = 0;
  3. @Override
  4. public Thread newThread(Runnable r) {
  5. return new Thread(r, "smart-socket:AIO-" + (++index));
  6. }
  7. });

初始化AIO服务的工作线程组并赋值于AioQuickServer成员属性asynchronousChannelGroup

  • 片段二
  1. this.serverSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup).bind(new InetSocketAddress(config.getPort()), 1000);

这行代码很直观,打开AIO服务通道并绑定端口号,但要注意bind方法。AsynchronousServerSocketChannel提供了两个bind接口:bind(SocketAddress local),bind(SocketAddress local, int backlog)

如果调用bind(SocketAddress local)方法,AsynchronousServerSocketChannel内部实际上执行的是bind(SocketAddress local, 0)。然而backlog的值小于1时,JDK会将其默认设置为50。 backlog维护了连接请求队列长度,如果队列满时收到连接指示,则拒绝该连接。举个例子:backlog设置为50,当前有50连接请求过来,服务端还未执行这些连接请求的accept方法。此时再有一个连接请求过来,则会被拒绝连接。除非请求队列中的某个连接完成accept操作并释放出队列资源,服务器才可接受新的连接。

  • 片段三
  1. acceptThread = new Thread(new Runnable() {
  2. NetMonitor<T> monitor = config.getMonitor();
  3. @Override
  4. public void run() {
  5. while (running) {
  6. Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();
  7. try {
  8. final AsynchronousSocketChannel channel = future.get();
  9. workerExecutorService.execute(new Runnable() {
  10. @Override
  11. public void run() {
  12. if (monitor == null || monitor.acceptMonitor(channel)) {
  13. createSession(channel);
  14. } else {
  15. config.getProcessor().stateEvent(null, StateMachineEnum.REJECT_ACCEPT, null);
  16. LOGGER.warn("reject accept channel:{}", channel);
  17. closeChannel(channel);
  18. }
  19. }
  20. });
  21. } catch (Exception e) {
  22. LOGGER.error("AcceptThread Exception", e);
  23. }
  24. }
  25. }
  26. }, "smart-socket:AcceptThread");
  27. acceptThread.start();
  28. protected void createSession(AsynchronousSocketChannel channel) {
  29. //连接成功则构造AIOSession对象
  30. AioSession<T> session = null;
  31. try {
  32. session = aioSessionFunction.apply(channel);
  33. session.initSession();
  34. } catch (Exception e1) {
  35. LOGGER.debug(e1.getMessage(), e1);
  36. if (session == null) {
  37. try {
  38. channel.shutdownInput();
  39. } catch (IOException e) {
  40. LOGGER.debug(e.getMessage(), e);
  41. }
  42. try {
  43. channel.shutdownOutput();
  44. } catch (IOException e) {
  45. LOGGER.debug(e.getMessage(), e);
  46. }
  47. try {
  48. channel.close();
  49. } catch (IOException e) {
  50. LOGGER.debug("close channel exception", e);
  51. }
  52. } else {
  53. session.close();
  54. }
  55. }
  56. }

smart-socket通过启动AcceptThread线程同步监听客户端连接请求,一旦客户端连接上来便生成异步任务由WorkerThread线程池来初始化AioSession。所有的AioSession共用aioReadCompletionHandler、aioWriteCompletionHandler对象,这样可以减少服务端产生的对象数。

2.5.3.2 shutdown:停止AIO服务端

AIO服务停止的逻辑很简单,关闭Channel通道,停止线程组。

  1. public final void shutdown() {
  2. running = false;
  3. try {
  4. if (serverSocketChannel != null) {
  5. serverSocketChannel.close();
  6. serverSocketChannel = null;
  7. }
  8. } catch (IOException e) {
  9. LOGGER.warn(e.getMessage(), e);
  10. }
  11. if (!workerExecutorService.isTerminated()) {
  12. try {
  13. workerExecutorService.shutdownNow();
  14. } catch (Exception e) {
  15. LOGGER.error("shutdown exception", e);
  16. }
  17. }
  18. if (!asynchronousChannelGroup.isTerminated()) {
  19. try {
  20. asynchronousChannelGroup.shutdownNow();
  21. } catch (IOException e) {
  22. LOGGER.error("shutdown exception", e);
  23. }
  24. }
  25. try {
  26. asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS);
  27. } catch (InterruptedException e) {
  28. LOGGER.error("shutdown exception", e);
  29. }
  30. }