go源码解析之TCP连接系列基于go源码1.16.5

网络数据接收

上一章我们通过跟踪TCPListener的Accept方法,了解了server侧接收、新建连接的过程,本章将通过TCPConn的Read方法的跟踪来了解读取网络数据的过程。

1. conn的Read方法

从上一章了解到TCPConn继承自conn,它的Read方法就是conn的Read,代码如下:

src/net/net.go

1
2
3
4
5
6
func (c *conn) Read(b []byte) (int, error) {
	...
	n, err := c.fd.Read(b)
	...
	return n, err
}

conn的Read方法调用了fd的Read方法,返回后进行了相关的错误判断。conn中的fd即netFD,netFD的Read方法如下:

src/net/fd_posix.go

1
2
3
4
5
func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError(readSyscallName, err)
}

netFD的Read方法又调用了pfd的Read,即poll.FD的Read方法,关于Read我们先暂停,看一下第二行的KeepAlive方法:

2. KeepAlive

src/runtime/mfinal.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Mark KeepAlive as noinline so that it is easily detectable as an intrinsic.
//go:noinline

// KeepAlive marks its argument as currently reachable.
// This ensures that the object is not freed, and its finalizer is not run,
// before the point in the program where KeepAlive is called.
//
// A very simplified example showing where KeepAlive is required:
// 	type File struct { d int }
// 	d, err := syscall.Open("/file/path", syscall.O_RDONLY, 0)
// 	// ... do something if err != nil ...
// 	p := &File{d}
// 	runtime.SetFinalizer(p, func(p *File) { syscall.Close(p.d) })
// 	var buf [10]byte
// 	n, err := syscall.Read(p.d, buf[:])
// 	// Ensure p is not finalized until Read returns.
// 	runtime.KeepAlive(p)
// 	// No more uses of p after this point.
//
// Without the KeepAlive call, the finalizer could run at the start of
// syscall.Read, closing the file descriptor before syscall.Read makes
// the actual system call.
func KeepAlive(x interface{}) {
	// Introduce a use of x that the compiler can't eliminate.
	// This makes sure x is alive on entry. We need x to be alive
	// on entry for "defer runtime.KeepAlive(x)"; see issue 21402.
	if cgoAlwaysFalse {
		println(x)
	}
}

注释好长是不是?但是代码很短,这说明一点:这个方法有点神奇,必须详细说明!!

它的作用就是保证传入的参数在这个方法被调用之前不被垃圾回收器回收掉。

什么情况下需要这个方法呢?注释里的例子给的就比较典型,下面按照代码行数分步解释:

  1. 例子通过系统调用open了一个文件,open返回了文件的fd(file descriptor,文件描述符),这个fd就是系统分配给被打开的文件的一个id,所以它是个整型。
  2. fd赋值给了File类型的p
  3. 设置了当p被回收时关闭p.d所代表的打开的文件(runtime.SetFinalizer提供了变量被回收时必要的数据清理回调,类似析构函数)。
  4. 进行系统调用Read。

我们设想一下没有KeepAlive的一种场景:在Read方法执行前,垃圾回收器执行,垃圾回收器发现p已经没有被其他任何地方引用,对p进行了垃圾回收,且因为对p设置了Finalizer,回收的过程中关闭了p.d。当程序恢复执行,Read方法运行,Read将在一个已经被关闭的fd上工作,必然是会出错的。

那么KeepAlive又是怎么保证传入它的变量不被回收?其实也不是什么魔法,就是因为变量被当作参数传入,所以在KeepAlive调用之前,该变量不能被回收。我们自己写一个类似方法也可以达到同样的效果。当然要注意编译选项go:noinline,它提示编译器不要将该方法内联,如果没有这个选项,空方法可能直接被编译器优化掉,没法起到keepalive作用。

回到Read方法中的runtime.KeepAlive(fd),再结合如下netFD的SetFinalizer方法,就容易理解了:

src/net/fd_posix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (fd *netFD) setAddr(laddr, raddr Addr) {
	fd.laddr = laddr
	fd.raddr = raddr
	runtime.SetFinalizer(fd, (*netFD).Close)
}

func (fd *netFD) Close() error {
	runtime.SetFinalizer(fd, nil)
	return fd.pfd.Close()
}

setAddr方法在netFD初始化后调用。

3. poll.FD的Read方法

我们回到Read方法的跟踪,以下是poll.FD的Read方法:

src/internal/poll/fd_unix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (fd *FD) Read(p []byte) (int, error) {
	...
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}
1
2
3
4
5
6
7
8
9
// ignoringEINTRIO is like ignoringEINTR, but just for IO calls.
func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) {
	for {
		n, err := fn(fd, p)
		if err != syscall.EINTR {
			return n, err
		}
	}
}

ignoringEINTRIO将syscall.Read作为方法参数传入,并循环调用Read,当错误不是syscall.EINTR时返回。查了一下EINTR错误码,它是当进程设置了signal handler,并且没有设置SA_RESTART,该进程收到信号后,进程内正在进行的可中断系统调用将返回EINTR错误。ENINTR错误不是系统调用出现了错误,而是信号导致的中断。关于原因可以参考这里的讨论

ignoringEINTRIO返回后,下面的错误处理和第二章Accept系统调用返回后类似:

如果错误是EAGAIN(socket被设置为非阻塞模式,在这个socket上的系统调用都会立即返回而不会阻塞线程,例如此处的read调用,即使没有读取到数据也会立即返回,但是错误信息会被设置为EAGAIN),并且fd.pd.pollable为true时,阻塞当前goroutine进行等待,直到有新的可读消息时continue,再次调用read进行数据读取。

这里提前简单说一下pollDesc(即FD中的pd),它是IO多路复用(如epoll、kqueue、CompletionPort等)在go语言中的集成,fd.pd.waitRead 即是等待io消息的到来。后续将有单独章节介绍epoll在go语言网络库中的使用。

最后看一下eofError方法:

src/internal/poll/fd_posix.go

1
2
3
4
5
6
7
8
// eofError returns io.EOF when fd is available for reading end of
// file.
func (fd *FD) eofError(n int, err error) error {
	if n == 0 && err == nil && fd.ZeroReadIsEOF {
		return io.EOF
	}
	return err
}

如果没有读取到数据且没有返回错误,再加上ZeroReadIsEOF这个为true,就返回EOF错误。我们看一下ZeroReadIsEOF的注释:

1
2
3
	// Whether a zero byte read indicates EOF. This is false for a
	// message based socket connection.
	ZeroReadIsEOF bool

从eofError方法的注释和ZeroReadIsEOF的注释基本可以断定EOF错误只适用于读取文件,网络连接数据的读取不会产生这个错误。

大家可能觉得奇怪,“我们不是在跟踪tcp数据读取的代码吗?怎么这里还有跟文件相关的东西?” 其实大家注意代码所在目录的话,可以看到我们跟踪的代码跨了三个目录,src/netsrc/internal/pollsrc/runtime,完全属于网络层的代码是在src/net包中,而src/internal/poll除了是网络底层的实现还是文件读写的底层实现

“网络数据的读写和文件数据的读写可以用同一个系统调用?” 没错,在linux世界里,任何io设备都可以用一个文件描述符(我们经常见到的fd)代表,而对这些文件描述符的读写都可以使用write和read系统调用。

可以看一下文件类的结构,同样包含了poll.FD:

src/os/file_unix.go

1
2
3
4
5
// file is the real representation of *File.
type file struct {
	pfd         poll.FD
    ...
}

4. 小结

本章通过跟踪conn的Read方法,了解了网络数据读取的过程。总结为以下几点:

  1. conn的Read方法调用了netFD的Read,netFD的Read方法又调用了poll.FD的Read
  2. KeepAlive(此KeepAlive非上一章给用作网络连接探活的KeepAlive)用来保证传入的变量在KeepAlive调用前不被回收
  3. EINTR是信号对系统调用产生了中断返回的错误号,不是系统调用的错误,遇到此错误号可以重试
  4. src/internal/poll包是各种io读写共用的底层包

下一章我们将对TCPConn的Write方法进行跟踪,来了解数据写入的过程。