go源码解析之TCP连接系列基于go源码1.16.5
TCP连接中的IO多路复用
本章将介绍golang在tcp连接中的IO多路复用的实现
1. 背景
有了解过IO多路复用的同学可能会好奇IO多路复用不就是select、poll、epoll那些内核东西么,怎么和go语言扯上关系的?
没错,我们在linux上运行的网络应用程序,大多都会选择使用epoll。如果我们的应用程序语言是c或者c++,完全可以直接使用系统函数进行网络程序编写,一般我们会使用一个线程并在循环中执行epoll wait,等待可读事件的到来。可读事件到来后,网络数据的读取则交由一组线程来执行。这是一种常用的多线程与epoll结合编写网络应用程序的模式。
go语言有自己的运行时,不同于其他语言直接使用系统线程,go语言中实现了更轻量的协程,go应用程序的运行就是go协程的运行和调度过程。它是如何将协程和epoll结合起来的,就是本章内容的重点。
2. 思考
实现一个tcp server时,我们一般会为每个连接创建一个协程,在这个协程中循环读取连接中的数据,并由数据驱动后续的业务逻辑执行,比较经典的就是http server的做法:
src/net/http/server.go
1
2
3
4
5
6
7
8
9
10
|
func (srv *Server) Serve(l net.Listener) error {
...
for {
rw, err := l.Accept()
...
c := srv.newConn(rw)
...
go c.serve(connCtx)
}
}
|
src/net/http/server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (c *conn) serve(ctx context.Context) {
...
for {
w, err := c.readRequest(ctx)
...
serverHandler{c.server}.ServeHTTP(w, w.req)
...
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
...
if !w.conn.server.doKeepAlives() {
return
}
...
}
}
|
http server在建立新连接后,启动了一个协程来读取对方发送的数据,并执行handler,并且如果设置了连接可重用和keepalive,将继续for循环等待下一个http请求的到来并进行读取和处理。
readRequest方法读取网络数据也是调用了TCPConn的Read方法(我们就不再展示代码了)。当网络数据还没有达到时,如果socket被设置为了阻塞模式,进行读取数据将导致当前协程被阻塞。当然这是不可能的,因为go中当建立tcp连接时,已经将socket设置为了非阻塞模式,可以回看第一章的sysSocket方法。(如果socket设置为阻塞模式,数据读取的系统调用将陷入内核态,直到有数据)
非阻塞模式下,系统调用返回again错误,这时本协程将让出运行权并等待网络数据到来时被重新加入可运行队列,参考Read方法的实现:
src/internal/poll/fd_unix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (fd *FD) Read(p []byte) (int, error) {
...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
|
以上代码在第三章中已经有介绍,当时没有再深入跟踪fd.pd.waitRead,而本章的目标就是了解go语言中epoll的使用及协程是如何被挂起等待网络数据的到来,及网络数据到来后又是如果唤醒被挂起的协程的。
3. 代码跟踪
socket使用epoll非常简单,第一步使用epoll add将socket加入epoll中,第二步在循环中调用epoll wait检测IO信号。本章我们看一下在go中将socket加入epoll中的代码:
epoll初始化与事件注册
src/net/fd_unix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
|
当新连接建立完成后,我们会将这个新连接的fd包装成为netFD,netFD中的一个重要成员变量是pfd,即poll包中的FD对象,经过前几章的讲解,大家应该对poll.FD已经比较了解了,TCPConn的读写方法的底层都由poll.FD实现。
我们看一下poll.FD的Init方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (fd *FD) Init(net string, pollable bool) error {
...
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
|
如果pollable不为true,fd的isBlocking被标记为1,认为该fd处于阻塞模式下,并返回。回顾上面的代码,netFD的init方法中对pollable传入了true。如果pollable为true,则调用了fd.pd.init方法,pd是pollDesc对象,我们看一下pollDesc的init方法:
src/internal/poll/fd_poll_runtime.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
|
该方法分为三步:
- runtime_pollServerInit,如果还没有创建epoll实例,将会创建epoll实例
- runtime_pollOpen,将fd注册到epoll的监听事件中
- pd.runtimeCtx = ctx,将runtime_pollOpen返回的runtime.pollDesc对象设置到runtimeCtx中
下面具体跟踪下前两步的方法:
1. epoll初始化——runtime_pollServerInit
第一行serverInit.Do(runtime_pollServerInit),serverInit是一个全局sync.Once对象,它的作用是保证指定的方法在整个程序的生命周期内最多执行一次。而runtime_pollServerInit方法是runtime包中poll_runtime_pollServerInit方法的声明,代码如下:
src/runtime/netpoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
|
netpollGenericInit中的判断是否已初始化和加锁逻辑我们略过,看一下netpollinit的实现:
src/runtime/netpoll_epoll.go
1
2
3
4
5
6
7
8
9
10
11
12
|
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
...
}
|
if判断中的epollcreate(1024)是对老版本内核的兼容,传入的参数在更老的版本(Linux 2.6.8之前)中代表了epoll可监听的最大事件数量,后面的版本只是用来提示内核初始分配数据的空间。
2. 将fd注册到epoll的监听事件中——runtime_pollOpen
src/runtime/netpoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
|
poll_runtime_pollOpen方法主要分为四步:
- 从pollcache中分配一个pollDesc对象
- 初始化pollDesc对象
- 调用netpollopen方法将fd注册到epoll的监听事件中,代码如下:
src/runtime/netpoll_epoll.go
1
2
3
4
5
6
|
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
|
该方法中将刚刚初始化的pollDesc对象作为epollevent的自定义数据传入底层。当epoll wait检测到IO事件后,除了会将IO事件关联的fd返回,还会将当时注册监听事件时传入的epollevent返回,从epollevent中我们就可以拿到当初设置的pollDesc对象。
- 返回pollDesc对象,我们着重看一下pollDesc对象中的link、fd、rg、wg成员变量
src/runtime/netpoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
...
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
|
link和fd比较容易理解,link是pollcache中pollDesc对象链表;fd是fd。
回顾poll.FD的Read方法,如下:
src/internal/poll/fd_unix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (fd *FD) Read(p []byte) (int, error) {
...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
|
当没有数据可读取时,调用fd.pd.waitRead方法,将当前的goroutine设置到pollDesc的rg中,并调用gopark让出运行权。
rg和wg就是被阻塞的读goroutine和写goroutine。当epoll wait方法返回可读或可写事件后,从epollevent中取出自定义数据pollDesc,并查看pollDesc中的rg和wg,如果有被阻塞的goroutine,则将此goroutine加入到可运行的goroutine队列。
4. 小结
- 根据以上的代码跟踪,可以看到与IO多路复用相关的代码位于src/internal/poll包中,更底层的代码则位于src/runtime包中(实际上很多底层的代码都位于此包中)。
- 新创建的socket在初始化时便会被设置为非阻塞模式,并将此socket注册到epoll中。
- runtime包中的pollDesc struct中记录了关联fd及由于IO未就绪被迫挂起的goroutine(rg和wg)。
本章介绍了IO多路复用的事件注册,下章将介绍事件通知。