go源码解析之TCP连接系列基于go源码1.16.5

TCP连接中的IO多路复用

上一章已经介绍了将socket注册到epoll中的过程,本章将以读取数据为例介绍epoll的事件通知。

读取数据时如果没有可读数据,将当前goroutine阻塞;epoll wait检测到有可读数据,将被阻塞的读取数据的goroutine恢复,此goroutine再次进行读取数据的系统调用。

无数据可读goroutine阻塞

回顾poll.FD的Read方法,如下:

src/internal/poll/fd_unix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (fd *FD) Read(p []byte) (int, error) {
	...
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

for循环中的第一行尝试进行数据读取,在没有数据可读时,将返回EAGAIN的错误,这时进行fd.pd.waitRead调用将使得当前goroutine阻塞,我们看一下waitRead代码:

src/internal/poll/fd_poll_runtime.go

1
2
3
4
5
6
7
8
9
func (pd *pollDesc) waitRead(isFile bool) error {
	return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
	...
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

pd.runtimeCtx是runtime包中的pollDesc对象,它是运行时调度网络io goroutine的关键数据结构。它在将socket注册入epoll时被初始化,具体见上一章。我们继续跟踪runtime_pollWait方法,此处的runtime_pollWait方法即rutime包中的poll_runtime_pollWait方法:

src/runtime/netpoll.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	...
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
	}
	return pollNoError
}

mode参数可能为’r’或者’w’或者’r'+‘w’,分别代表读和写和读写。 循环中调用netpollblock方法,直到返回为true,代表有可读io事件,退出循环,方法返回。在继续跟踪netpollblock方法之前,我们再回顾一下pollDesc对象:

src/runtime/netpoll.go

1
2
3
4
5
6
type pollDesc struct {
	...
	rg      uintptr   // pdReady, pdWait, G waiting for read or nil
	wg      uintptr   // pdReady, pdWait, G waiting for write or nil
	...
}

其他属性略去,看一下关键的rg和wg。以rg为例,根据注释,它有四种状态,分别为pdReady、pdWait、G waiting for read、nil。

  1. pdReady是一个常量,值为1,代表有可读的IO事件
  2. pdWait也是一个常量,值为2,代表没有可读的IO事件,goroutine将要阻塞
  3. G waiting for read,值为当前goroutine对象的地址,代表goroutine已经阻塞,在等待IO事件
  4. nil,值为0,是初始状态

继续跟踪netpollblock方法:

src/runtime/netpoll.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	// set the gpp semaphore to pdWait
	for {
		old := *gpp
		if old == pdReady {
			*gpp = 0
			return true
		}
		if old != 0 {
			throw("runtime: double wait")
		}
		if atomic.Casuintptr(gpp, 0, pdWait) {
			break
		}
	}

	if waitio || netpollcheckerr(pd, mode) == 0 {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	
	old := atomic.Xchguintptr(gpp, 0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

gpp根据mode参数,被设置为rg或者wg的地址。下面我们还是以mode是读为例。

for循环中首先判断了当前gpp指向地址中的值如果是pdReady,说明已经有可读的IO事件,没必要再将goroutine阻塞,方法直接返回。否则将gpp指向的地址中的值设置为pdWait,设置成功后跳出for循环。

接着进行gopark调用,将当前goroutine阻塞,gpp指向的地址中的值将被设置为当前goroutine对象的地址(如何设置的,请看gopark的第一个参数netpollblockcommit,比较简单)。至此当前goroutine已经被暂停运行,放入了阻塞队列中。

直到此goroutine再次被运行,将继续执行余下的代码。将gpp地址中的值设置为0并返回之前的旧值。如果旧值为pdReady,代表有可读的IO事件,方法返回true。

有可读IO事件,将goroutine唤醒

我们从epoll wait方法调用处开始看起:

src/runtime/netpoll_epoll.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func netpoll(delay int64) gList {
	...
	var events [128]epollevent
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	...
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}
		...
		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.everr = false
			if ev.events == _EPOLLERR {
				pd.everr = true
			}
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

epollwait即epoll的系统调用,这个方法将阻塞当前线程,直到有IO事件或者超时。

IO事件将被设置到传入的events数组中,epollwait返回后,遍历events数组,根据每一个event的IO事件类型设置mode变量。

从event的data属性中取出pollDesc对象(将socket注册入epoll时,已经把socket对应的pollDesc对象作为event的data设置到epoll中,见上一章),调用netpollready,参数分别为toRun、pd和mode,pd和mode不用再说。

toRun是一个goroutine对象的链表,netpollready方法将可运行的goroutine放入toRun链表,toRun会被netpoll方法作为返回值返回,调用者将toRun中的goroutine从阻塞状态切换至可运行状态。

src/runtime/netpoll.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}

netpollready方法将netpollunblock返回的goroutine放入toRun中。

src/runtime/netpoll.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		old := *gpp
		if old == pdReady {
			return nil
		}
		if old == 0 && !ioready {
			// Only set pdReady for ioready. runtime_pollWait
			// will check for timeout/cancel before waiting.
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		if atomic.Casuintptr(gpp, old, new) {
			if old == pdWait {
				old = 0
			}
			return (*g)(unsafe.Pointer(old))
		}
	}
}

netpollunblock方法与netpollblock对应,它将pollDesc中的rg或者wg设置为pdReady,并将阻塞的goroutine对象返回。

总结

go的网络底层在IO不可用时将goroutine阻塞、IO可用时将goroutine恢复。通过这种阻塞goroutine的方式使得go应用中网络程序的编写可以使用同步模式,极大得降低了网络程序编写的难度。这也得益于goroutine非常轻量,相比线程调度,goroutine调度消耗可忽略不计。

在如c++这种语言中,“每个socket分配一个线程,当IO不可用时将线程阻塞,实现同步模式”是不可能的,程序的运行将有很多消耗在成千上万的线程的调度上。一种解决方案是“读取数据时注册一个回调方法,当有数据可读时,从线程池中获得一个线程来执行对应的回调方法”,这就是一种异步模式。

虽然go语言通过goroutine的调度极大地简化了网络编程,但是这种模式不是适合任何的应用场景。例如使用go实现一个单机支持长连接10w+的网关应用,这将导致至少10w+的goroutine的调度。如果想要解决大量goroutine调度带来的性能损耗,就不能再使用go的标准网络库,可以参考其他高性能语言的做法,采用goroutine池,实现一种异步网络编程。

后记

go源码解析之TCP连接系列到此就结束了。首次写这种源码解析的文章,完全按照代码的调用进行写作,可能叫做源码笔记更合适。

写这一系列go网络源码的笔记其实源于一个号称比go官方网络库性能高的开源项目,https://github.com/cloudwego/netpoll,看过他的简介后大约理解了他进行优化的一方面“通过goroutine池代替go官方的每个连接一个goroutine,解决goroutine太多带来的调度性能损耗”。进而激起我对go官方如何将epoll和goroutine相结合的窥探欲望。

go源码解析之TCP连接系列虽然结束了,但是后续还会有其他关于go源码解析的文章,欢迎继续关注。