8.1 非阻塞io

Go提供的网络接口,在用户层是阻塞的,这样最符合人们的编程习惯。在runtime层面,是用epoll/kqueue实现的非阻塞io,为性能提供了保障。

如何实现

底层非阻塞io是如何实现的呢?简单地说,所有文件描述符都被设置成非阻塞的,某个goroutine进行io操作,读或者写文件描述符,如果此刻io还没准备好,则这个goroutine会被放到系统的等待队列中,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用。

后台还有一个poller会不停地进行poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。

这个poller是在后台一直运行的,前面分析系统调度章节时为了简化并没有提起它。其实在proc.c文件中,runtime.main函数的第一行代码就是

  1. newm(sysmon, nil);

这个意思就是新建一个M并让它运行sysmon函数,前面说过M就是机器的抽象,它会直接开一个物理线程。sysmon里面是个死循环,每睡眠一小会儿就会调用runtime.epoll函数,这个sysmon就是所谓的poller。

poller是一个比gc更高优先级的东西,何以见得呢?首先,垃圾回收只是用runtime.newproc建立出来的,它仅仅是个goroutine任务,而poller是直接用newm建立出来的,它跟startm是平级的。也就相当于gc只是线程池里的任务,而poller自身直接就是worker。然后,gc只是被触发性地发生的,是被动的。而poller却是每隔很短时间就会主动运行。

封装层次

从最原始的epoll系统调用,到提供给用户的网络库函数,可以分成三个封装层次。这三个层次分别是,依赖于系统的api封装,平台独立的runtime封装,提供给用户的库的封装。

最下面一层是依赖于系统部分的封装。各个平台下的实现并不一样,比如linux下是封装的epoll,freebsd下是封装的kqueue。以linux为例,实现了一组调用epoll相关系统调用的封装:

  1. int32 runtime·epollcreate(int32 size);
  2. int32 runtime·epollcreate1(int32 flags);
  3. int32 runtime·epollctl(int32 epfd, int32 op, int32 fd, EpollEvent *ev);
  4. int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
  5. void runtime·closeonexec(int32 fd);

它们都是直接使用汇编调用系统调用实现的,比如:

  1. TEXT runtime·epollcreate1(SB),7,$0
  2. MOVL 8(SP), DI
  3. MOVL $291, AX // syscall entry
  4. SYSCALL
  5. RET

这些函数还要继续被封装成下面一组函数:

  1. runtime·netpollinit(void);
  2. runtime·netpollopen(int32 fd, PollDesc *pd);
  3. runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);

runtime·netpollinit是对poller进行初始化。
runtime·netpollopen是对fd和pd进行关联,实现边沿触发通知。
runtime·netpollready,使用前必须调用这个函数来表示fd是就绪的

不管是哪个平台,最终都会将依赖于系统的部分封装好,提供上面这样一组函数供runtime使用。

接下来是平台独立的poller的封装,也就是runtime层的封装。这一层封装是最复杂的,它对外提供的一组接口是:

  1. func runtime_pollServerInit()
  2. func runtime_pollOpen(fd int) (pd *PollDesc, errno int)
  3. func runtime_pollClose(pd *PollDesc)
  4. func runtime_pollReset(pd *PollDesc, mode int) (err int)
  5. func runtime_pollWait(pd *PollDesc, mode int) (err int)
  6. func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int)
  7. func runtime_pollUnblock(pd *PollDesc)

这一组函数是由runtime封装好,提供给net包调用的。里面定义了一个PollDesc的结构体,将fd和对应的goroutine封装起来,从而实现当goroutine读写fd阻塞时,将goroutine变为Gwaiting。等一下回头再看实现的细节。

最后一层封装层次是提供给用户的net包。在net包中网络文件描述符都是用一个netFD结构体来表示的,其中有个成员就是pollDesc。

  1. // 网络文件描述符
  2. type netFD struct {
  3. sysmu sync.Mutex
  4. sysref int
  5. // must lock both sysmu and pollDesc to write
  6. // can lock either to read
  7. closing bool
  8. // immutable until Close
  9. sysfd int
  10. family int
  11. sotype int
  12. isConnected bool
  13. sysfile *os.File
  14. net string
  15. laddr Addr
  16. raddr Addr
  17. // serialize access to Read and Write methods
  18. rio, wio sync.Mutex
  19. // wait server
  20. pd pollDesc
  21. }

所有用户的net包的调用最终调用到pollDesc的上面那一组函数中,这样就实现了当goroutine读或写阻塞时会被放到等待队列。最终的效果就是用户层阻塞,底层非阻塞。

文件描述符和goroutine

当一个goroutine进行io阻塞时,会去被放到等待队列。这里面就关键的就是建立起文件描述符和goroutine之间的关联。pollDesc结构体就是完成这个任务的。它的结构体定义如下:

  1. struct PollDesc
  2. {
  3. PollDesc* link; // in pollcache, protected by pollcache.Lock
  4. Lock; // protectes the following fields
  5. int32 fd;
  6. bool closing;
  7. uintptr seq; // protects from stale timers and ready notifications
  8. G* rg; // 因读这个fd而阻塞的G,等待READY信号
  9. Timer rt; // read deadline timer (set if rt.fv != nil)
  10. int64 rd; // read deadline
  11. G* wg; // 因写这个fd而阻塞的goroutines
  12. Timer wt;
  13. int64 wd;
  14. };

这个结构体是重用的,其中link就是将它链起来。PollDesc对象必须是类型稳定的,因为在描述符关闭/重用之后我们会得到epoll/kqueue就绪通知。结构体中有一个seq序号,稳定的通知是通过使用这个序号实现的,当deadline改变或者描述符重用时,序号会增加。

runtime_pollServerInit的实现就是调用更下层的runtime·netpollinit函数。
runtime_pollOpen从PollDesc结构体缓存中拿一个出来,设置好它的fd。之所以叫Open而不是new,就是因为PollDesc结构体是重用的。
runtime_pollClose函数调用runtime·netpollclose后将PollDesc结构体放回缓存。

这些都还没涉及到fd与goroutine交互部分,仅仅是直接对epoll的调用。从下面这个函数可以看到fd与goroutine交互部分:

  1. func runtime_pollWait(pd *PollDesc, mode int) (err int)

它会调用到netpollblock,这个函数是这样子的:

  1. static void
  2. netpollblock(PollDesc *pd, int32 mode)
  3. {
  4. G **gpp;
  5. gpp = &pd->rg;
  6. if(mode == 'w')
  7. gpp = &pd->wg;
  8. if(*gpp == READY) {
  9. *gpp = nil;
  10. return;
  11. }
  12. if(*gpp != nil)
  13. runtime·throw("epoll: double wait");
  14. *gpp = g;
  15. runtime·park(runtime·unlock, &pd->Lock, "IO wait");
  16. runtime·lock(pd);
  17. }

最后的runtime.park函数,就是将当前的goroutine(调用者)设置为waiting状态。

上面这一部分是goroutine被放到等待队列的部分,下面看它被唤醒的部分。在sysmon函数中,会不停地调用runtime.epoll,这个函数对就绪的网络连接进行poll,返回可运行的goroutine。epoll只能知道哪个fd就绪了,那么它怎么知道哪个goroutine就绪了呢?原来epoll的data域存放的就是PollDesc结构体指针。因此就可以得到其中的goroutine了。