go源码解析之TCP连接系列基于go源码1.16.5

连接是如何建立的

上一章我们通过跟踪net.Listen的调用,了解了socket的创建、端口绑定、开启监听。最后net.Listen返回了一个Listener(具体对于TCP连接为TCPListener),本章将通过该Listener的Accept方法的跟踪来了解连接的建立过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ln, err := net.Listen("tcp", ":8080")
if err != nil {
        // handle error
}
for {
        conn, err := ln.Accept()
        if err != nil {
                // handle error
        }
        go handleConnection(conn)
}

下面我们通过逐行跟踪源码,来看连接建立的过程:

1.TCPListener的Accept方法

src/net/tcpsock.go

1
2
3
4
5
6
func (l *TCPListener) Accept() (Conn, error) {
    ...
	c, err := l.accept()
    ...
	return c, nil
}

Accept调用了TCPListener的内部方法accept:

src/net/tcpsock_posix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (ln *TCPListener) accept() (*TCPConn, error) {
	fd, err := ln.fd.accept()
	if err != nil {
		return nil, err
	}
	tc := newTCPConn(fd)
	if ln.lc.KeepAlive >= 0 {
		setKeepAlive(fd, true)
		ka := ln.lc.KeepAlive
		if ln.lc.KeepAlive == 0 {
			ka = defaultTCPKeepAlive
		}
		setKeepAlivePeriod(fd, ka)
	}
	return tc, nil
}

我们先跳过ln.fd.accept和newTCPConn两个方法调用,将上一章遗留的KeepAlive配置项看一下: 大家应该还记得KeepAlive是ListenConfig中的一个属性,而ListenConfig和创建成功的监听netFD被赋值给了TCPListener:

src/net/tcpsock.go

1
2
3
4
type TCPListener struct {
	fd *netFD
	lc ListenConfig
}

如果KeepAlive大于等于0,设置socket开启KeepAlive,如果KeepAlive等于0,默认设置socket的TCP_KEEPINTVL和TCP_KEEPIDLE属性为15秒,否则设置为用户指定的时间。

2.setKeepAlive

setKeepAlive和setKeepAlivePeriod方法类似,都是设置socket的属性,我们放到一起来看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func setKeepAlive(fd *netFD, keepalive bool) error {
	err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive))
	runtime.KeepAlive(fd)
	return wrapSyscallError("setsockopt", err)
}

func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
	// The kernel expects seconds so round to next highest second.
	secs := int(roundDurationUp(d, time.Second))
	if err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil {
		return wrapSyscallError("setsockopt", err)
	}
	err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs)
	runtime.KeepAlive(fd)
	return wrapSyscallError("setsockopt", err)
}

两个方法中都执行了fd.pfd.SetsockoptInt,而pfd则是netFD中的一个属性:

src/net/fd_posix.go

1
2
3
4
5
type netFD struct {
	pfd poll.FD

    ...
}

进一步看poll.FD的SetsockoptInt方法:

src/internal/poll/sockopt.go

1
2
3
4
func (fd *FD) SetsockoptInt(level, name, arg int) error {
    ...
    return syscall.SetsockoptInt(fd.Sysfd, level, name, arg)
}

可以看到进行了SetsockoptInt的系统调用,进行socket的属性设置。被设置的目标就是fd.Sysfd。回忆上一章中newFD方法,此处的Sysfd就是创建的系统socket的fd。 由于整个net包中不管是监听socket还是主动connect成功的socket还是accept建立的socket,都是使用netFD类进行包装,所以最好记住这个层级关系: netFD对poll.FD进行包装,poll.FD对系统fd进行包装

作为额外知识,介绍一下keepalive的三个内核参数:

  1. tcp_keepalive_time, 如果在该参数指定的秒数内连接始终处于空闲状态(没有收到远程主机的数据,ack不算),则内核向远程主机发起对该主机的探测
  2. tcp_keepalive_intvl,该参数以秒为单位,规定内核向远程主机发送探测的时间间隔
  3. tcp_keepalive_probes,该参数规定内核为了检测远程主机的存活而发送的探测的数量,如果探测的数量已经使用完毕仍旧没有得到响应,即断定不可达,关闭与该主机的连接,释放相关资源

setKeepAlive方法中的SO_KEEPALIVE则是设置keepalive的总开关,setKeepAlivePeriod中的TCP_KEEPINTVL对应tcp_keepalive_intvl参数,TCP_KEEPIDLE对应tcp_keepalive_time参数。TCP_KEEPCNT对应tcp_keepalive_probes,但是代码中没有搜索到使用的地方。

让我们回到accept的主流程,先跟一下ln.fd.accept方法调用:

3.netFD的accept方法

src/net/fd_unix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (fd *netFD) accept() (netfd *netFD, err error) {
	d, rsa, errcall, err := fd.pfd.Accept()
	...

	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
    ...
	return netfd, nil
}

pfd.Accept即poll.FD的Accept方法,代码如下:

src/internal/poll/fd_unix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
	for {
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
        ...
		case syscall.EAGAIN:
			if fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
       ...
		}
		return -1, nil, errcall, err
	}
}

内部方法accept代码如下:

src/internal/poll/sock_cloexec.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func accept(s int) (int, syscall.Sockaddr, string, error) {
	ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
	
	switch err {
	case nil:
		return ns, sa, "", nil
    ...
	}

	...
}

Accept4Func同样是一个系统调用方法:var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4,参数s是socket的fd,SOCK_NONBLOCK|SOCK_CLOEXEC则是设置新连接socket的属性。连接成功返回新连接socket的fd和主机地址信息。

我们再返回到poll.FD的Accept方法,accept返回后,如果没有错误就返回新连接socket的fd和主机地址信息。如果错误是EAGAIN(socket被设置为非阻塞模式,在这个socket上的系统调用都会立即返回而不会阻塞线程,例如此处的accept调用,即使没有新的连接,也会立即返回,但是错误信息会被设置为EAGAIN),并且fd.pd.pollable为true时,阻塞当前goroutine进行等待,直到有新的可读消息(此处为有新连接)时continue,再次调用accept进行接收连接。

这里提前简单说一下pollDesc(即FD中的pd),它是IO多路复用(如epoll、kqueue、CompletionPort等)在go语言中的集成,fd.pd.waitRead 即是等待io消息的到来。后续将有单独章节介绍epoll在go语言网络库中的使用。

1
2
3
4
5
6
type FD struct {
    ...
	// I/O poller.
	pd pollDesc
    ...
}

poll.FD的Accept方法返回到netFD的accept方法中,接着调用了newFD创建了netFD,newFD方法在上一章已经介绍,不再赘述。

到目前为止,整个调用链路基本讲完了,我们现在通过下面这张图回顾一下:

3

  1. TCPListener的accept方法调用netFD的accept方法,返回成功后,调用newTCPConn构建连接对象,并设置连接的keepalive属性
  2. netFD的accept方法调用poll.FD的Accept方法,返回成功后,调用newFD创建新socket的netFD对象
  3. poll.FD的Accept方法进行accept系统调用,如果有新连接建立成功则返回新连接socket的fd,如果遇到EAGAIN错误,则阻塞当前goroutine进行IO消息等待。

4. newTCPConn

src/net/tcp_sock.go

1
2
3
4
5
func newTCPConn(fd *netFD) *TCPConn {
	c := &TCPConn{conn{fd}}
	setNoDelay(c.fd, true)
	return c
}

conn是对接口类型Conn的实现,conn的唯一属性则是我们前面一直提到的netFD,conn的核心方法都是对netFD方法的包装:

src/net/net.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Conn interface {
	Read(b []byte) (n int, err error)
	Write(b []byte) (n int, err error)
	Close() error
	LocalAddr() Addr
	RemoteAddr() Addr
	SetDeadline(t time.Time) error
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
}

type conn struct {
	fd *netFD
}

TCPConn继承自conn,它比较独特的一个方法就是ReadFrom,用来从一个Reader中读取数据并写入到TCPConn的socket上:

src/net/tcpsock.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type TCPConn struct {
	conn
}

// ReadFrom implements the io.ReaderFrom ReadFrom method.
func (c *TCPConn) ReadFrom(r io.Reader) (int64, error) {
    ...
	n, err := c.readFrom(r)
    ...
	return n, err
}

src/net/tcpsock_posix.go

1
2
3
4
5
6
7
8
9
func (c *TCPConn) readFrom(r io.Reader) (int64, error) {
	if n, err, handled := splice(c.fd, r); handled {
		return n, err
	}
	if n, err, handled := sendFile(c.fd, r); handled {
		return n, err
	}
	return genericReadFrom(c, r)
}

可以看到readFrom进行了两种读取并写入的尝试,这两种方式都是为了减少用户空间到内核空间的数据拷贝:

  1. splice方式,通过建立一个临时的pipe,将输入splice至pipe,再将pipe splice至输出。这里要求Reader必须是tcp或者unix连接
  2. send file方式,通过sendFile系统调用,将Reader中数据高效地传输到socket上。这里要求Reader必须是文件
  3. genericReadFrom,回归到最原始的数据拷贝方式

如果我们需要向socket写入数据并且数据源实现了Reader接口的话,我们可以选择使用ReadFrom方法来提高性能。

5. 小结

今天通过跟踪TCPListener的Accept方法,了解了server侧接收到新连接的过程。总结为以下几个点:

  1. TCPConn继承自conn,conn对netFD进行包装并实现了Conn接口,netFD对poll.FD进行包装,poll.FD对系统fd进行包装
  2. keepalive设置
  3. 避免用户空间到内核空间的数据拷贝的两种方式:splice和sendfile

下一章我们将对TCPConn的Read方法进行跟踪,来了解数据读取的过程。