go源码解析之TCP连接系列基于go源码1.16.5

网络数据发送

上一章我们通过跟踪TCPConn的Read方法,了解了读取数据的过程,本章将通过TCPConn的Write方法的跟踪来了解数据写入的过程。

1. conn的Write方法

从上一章了解到TCPConn继承自conn,它的Write方法就是conn的Write,代码如下:

src/net/net.go

1
2
3
4
5
6
func (c *conn) Write(b []byte) (int, error) {
	...
	n, err := c.fd.Write(b)
	...
	return n, err
}

conn的Write方法调用了netFD的Write方法:

src/net/fd_posix.go

1
2
3
4
5
func (fd *netFD) Write(p []byte) (nn int, err error) {
	nn, err = fd.pfd.Write(p)
	runtime.KeepAlive(fd)
	return nn, wrapSyscallError(writeSyscallName, err)
}

pfd则是poll.FD,看一下它的Write方法:

src/internal/poll/fd_unix.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (fd *FD) Write(p []byte) (int, error) {
	if err := fd.writeLock(); err != nil {
		return 0, err
	}
	defer fd.writeUnlock()
	...
	var nn int
	for {
		max := len(p)
		...
		n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
		if n > 0 {
			nn += n
		}
		if nn == len(p) {
			return nn, err
		}
		if err == syscall.EAGAIN && fd.pd.pollable() {
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
		if err != nil {
			return nn, err
		}
		if n == 0 {
			return nn, io.ErrUnexpectedEOF
		}
	}
}

和Read方法一样,当遇到EAGAIN错误且pollable为true,进行等待。些许不同的是,Write方法的for循环中会保证传入的数据都写完才返回。

2. poll.FD的加锁方法

回到方法开头的writeLock,其实在第二章和第三章的Accept和Read方法的开头都有readLock和readUnlock操作,只是当时为了减少文章篇幅省略了。下面把这块给补回来。

看一下poll.FD的writeLock、writeUnlock、readLock、readUnlock代码:

src/internal/poll/fd_mutex.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// readLock adds a reference to fd and locks fd for reading.
// It returns an error when fd cannot be used for reading.
func (fd *FD) readLock() error {
	if !fd.fdmu.rwlock(true) {
		return errClosing(fd.isFile)
	}
	return nil
}

// readUnlock removes a reference from fd and unlocks fd for reading.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) readUnlock() {
	if fd.fdmu.rwunlock(true) {
		fd.destroy()
	}
}

// writeLock adds a reference to fd and locks fd for writing.
// It returns an error when fd cannot be used for writing.
func (fd *FD) writeLock() error {
	if !fd.fdmu.rwlock(false) {
		return errClosing(fd.isFile)
	}
	return nil
}

// writeUnlock removes a reference from fd and unlocks fd for writing.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) writeUnlock() {
	if fd.fdmu.rwunlock(false) {
		fd.destroy()
	}
}

每个锁方法都是调用了fdmu的rwlock或rwunlock,fdmu则是FD中的一个fdMutex类型的成员变量。先看一下fdMutex的结构定义:

src/internal/poll/fd_mutex.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// fdMutex是管理fd生命周期以及使得FD的Read、Write和Close方法串行执行的同步原语
type fdMutex struct {
	state uint64
	rsema uint32
	wsema uint32
}

// fdMutex.state 不同的字节代表了不同含义:
// 第1个比特位 - 代表FD是否被关闭, 如果设置为1,所有后续的锁操作都会失败.
// 第2个比特位 - 代表锁定读.
// 第3个比特位 - 代表锁定写.
// 第4至23个比特位 - 代表对FD的总引用数量,包括读、写和其他杂项(例如设置socketopt).
// 第24至43个比特位 - 等待进行读操作的等待数量.
// 第44至63个比特位 - 等待进行写操作的等待数量.
const (
	mutexClosed  = 1 << 0
	mutexRLock   = 1 << 1
	mutexWLock   = 1 << 2
	mutexRef     = 1 << 3
	mutexRefMask = (1<<20 - 1) << 3
	mutexRWait   = 1 << 23
	mutexRMask   = (1<<20 - 1) << 23
	mutexWWait   = 1 << 43
	mutexWMask   = (1<<20 - 1) << 43
)

可以看到fdMutex的state变量的不同位代表了不同的状态,当第1个比特位为1,表示fd已经关闭,加锁操作会直接返回;当第2个比特位为1,表示读锁定,需要等待读锁释放,为0则代表可以获取读锁;当第3个比特位为1,表示写锁定,需要等待写锁释放,为0则表示可以获取写锁。

加锁方法rwlock的代码我们通过注释的方式进行讲解:

src/internal/poll/fd_mutex.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (mu *fdMutex) rwlock(read bool) bool {
	var mutexBit, mutexWait, mutexMask uint64
	var mutexSema *uint32
	if read {
		mutexBit = mutexRLock
		mutexWait = mutexRWait
		mutexMask = mutexRMask
		mutexSema = &mu.rsema
	} else {
		mutexBit = mutexWLock
		mutexWait = mutexWWait
		mutexMask = mutexWMask
		mutexSema = &mu.wsema
	}
	for {
        //通过原子方法获取state的值,并判断关闭位是否被设置1
		old := atomic.LoadUint64(&mu.state)
		if old&mutexClosed != 0 {
			return false
		}
       //判断锁定位
		var new uint64
		if old&mutexBit == 0 {
			// 没有被锁定,设置state的新值为被锁定且增加引用计数
			new = (old | mutexBit) + mutexRef
           // 判断引用计数是否溢出
			if new&mutexRefMask == 0 {
				panic(overflowMsg)
			}
		} else {
			// 已经被锁定,增加state中的等待计数
			new = old + mutexWait
           // 判断等待计数是否溢出
			if new&mutexMask == 0 {
				panic(overflowMsg)
			}
		}
       // 通过CAS原子方式设置state为新值
		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
           // 设置成功且之前是未锁定状态,获取锁成功
			if old&mutexBit == 0 {
				return true
			}
           // 之前是锁定状态,使用信号量将自身阻塞,等待锁释放后的唤醒信号
			runtime_Semacquire(mutexSema)
			// The signaller has subtracted mutexWait.
		}
       // 如果CAS方法没有成功,则返回循环其实为止,继续获取最新的state值
	}
}

以上代码有几个点需要单独讲解,方便对代码的理解:

  1. rwlock方法是读锁和写锁共用的方法,和我们平时接触到的读写锁有所不同,这里的读锁和写锁不是互斥的,其实原因也很简单,socket的读和写并不冲突,不需要加锁形成互斥(其实socket本身的读写是多线程安全的,这里的锁主要是为了锁定fd.pd.waitWrite操作,pd.waitWrite涉及go语言实现io多路复用,后续专门章节讲解)
  2. CompareAndSwapUint64是一个原子方法,它对比state当前值与old值是否相同,如相同则将state设置为new值,且返回true;如不相同则返回false。而for循环的开头则调用了LoadUint64原子方法,将old设置为state的最新值。在for循环的最后,通过调用CompareAndSwapUint64判断state的值是否已经被其他协程改变,如果没有改变,可以设置计算出的新值;如果发生了变化,则返回for循环的开头,继续取state的最新值进行计算和判断。
  3. 可能有的同学会对for循环的性能有疑问,可能会觉得锁这里的并发高了以后,CAS频繁失败,for循环执行次数太多导致吃cpu。我们从两方面来打消这种疑虑,一是从并行角度:多cpu每个cpu上运行的线程并行执行相同fd的rwlock代码,这样的并行数量取决于cpu数量,不可能高;二是从并发角度:例如正在执行rwlock的协程或者线程被抢占,当再此被唤醒执行时,其他的协程或者线程已经执行过相同fd的rwlock代码,导致state值变了,CAS失败,返回for循环开头继续计算,这种情况出现在高并发且大部分协程在rwlock代码执行过程中被抢占,不能说不可能出现,但是概率很小。

rwunlock的方法就不再详细讲解了,代码中标有部分注释:

src/internal/poll/fd_mutex.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (mu *fdMutex) rwunlock(read bool) bool {
	var mutexBit, mutexWait, mutexMask uint64
	var mutexSema *uint32
	if read {
		mutexBit = mutexRLock
		mutexWait = mutexRWait
		mutexMask = mutexRMask
		mutexSema = &mu.rsema
	} else {
		mutexBit = mutexWLock
		mutexWait = mutexWWait
		mutexMask = mutexWMask
		mutexSema = &mu.wsema
	}
	for {
        // 通过原子方法获取state的最新值
		old := atomic.LoadUint64(&mu.state)
        // 如果没有加锁或者引用数为0,panic(一般是没有成对出现rwlock导致)
		if old&mutexBit == 0 || old&mutexRefMask == 0 {
			panic("inconsistent poll.fdMutex")
		}
		// 将新值的锁标记位清空并较少一次引用
		new := (old &^ mutexBit) - mutexRef
       // 减少一个等待数量
		if old&mutexMask != 0 {
			new -= mutexWait
		}
       // CAS
		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
           // 如果之前有等待者,发送信号量唤醒等待者
			if old&mutexMask != 0 {
				runtime_Semrelease(mutexSema)
			}
           // 如果已经关闭且没有其他引用,返回已关闭
			return new&(mutexClosed|mutexRefMask) == mutexClosed
		}
	}
}

fdMutex除了对fd的读写做并发控制,还控制了fd的生命周期,下面看一下increfAndClose方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (mu *fdMutex) increfAndClose() bool {
	for {
       // 依然是先判断是否已关闭
		old := atomic.LoadUint64(&mu.state)
		if old&mutexClosed != 0 {
			return false
		}
		// 设置state的新值为关闭状态且引用次数加一
		new := (old | mutexClosed) + mutexRef
		if new&mutexRefMask == 0 {
			panic(overflowMsg)
		}
		// 清空读等待数和写等待数
		new &^= mutexRMask | mutexWMask
        // CAS
		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
			// 唤醒所有等待者
			for old&mutexRMask != 0 {
				old -= mutexRWait
				runtime_Semrelease(&mu.rsema)
			}
			for old&mutexWMask != 0 {
				old -= mutexWWait
				runtime_Semrelease(&mu.wsema)
			}
			return true
		}
	}
}

rwlock、rwunlock、increfAndClose三个方法看完后,我们接着将state中除了低3位代表的关闭位、读锁位、写锁位外的引用计数、写等待计数、读等待计数的作用总结一下:

  1. 引用计数,每个操作fd的方法中都会首先增加引用计数,方法结束再减少引用。它标记了fd正在被引用的次数,在设置fd关闭后必须等待不再有引用才能销毁fd,否则在其他方法还在引用fd的时候,fd被销毁,将产生不可预知的错误。
  2. 写等待计数、读等待计数,标记等待者的数量,当解锁时如果等待数量不为0,需要唤醒一个等待者,当关闭fd时,需要将所有等待者唤醒,等待者唤醒后发现fd被设置为关闭,再进入关闭判断分支执行。

3. 小结

本章通过跟踪conn的Write方法,了解了网络数据写入的过程及poll.FD的读写锁。总结为以下几点:

  1. conn的Write方法调用了netFD的Write,netFD的Write方法又调用了poll.FD的Write
  2. poll.FD的rwlock通过原子操作更新读、写和关闭状态,当锁获取失败时调用信号量方法将自身阻塞并等待其他协程解锁后释放信号量。

下一章我们将对TCPConn的Close方法进行跟踪,来了解连接关闭的过程。