当前位置 博文首页 > 深度解密 Go 语言中的 sync.Pool

    深度解密 Go 语言中的 sync.Pool

    作者:Stefno 时间:2021-02-18 06:39

    最近在工作中碰到了 GC 的问题:项目中大量重复地创建许多对象,造成 GC 的工作量巨大,CPU 频繁掉底。准备使用 sync.Pool 来缓存对象,减轻 GC 的消耗。为了用起来更顺畅,我特地研究了一番,形成此文。本文从使用到源码解析,循序渐进,一一道来。

    是什么

    sync.Pool 是 sync 包下的一个组件,可以作为保存临时取还对象的一个“池子”。个人觉得它的名字有一定的误导性,因为 Pool 里装的对象可以被无通知地被回收,可能 sync.Cache 是一个更合适的名字。

    有什么用

    对于很多需要重复分配、回收内存的地方,sync.Pool 是一个很好的选择。频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,而 sync.Pool 可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。

    怎么用

    首先,sync.Pool 是协程安全的,这对于使用者来说是极其方便的。使用前,设置好对象的 New 函数,用于在 Pool 里没有缓存的对象时,创建一个。之后,在程序的任何地方、任何时候仅通过 Get()Put() 方法就可以取、还对象了。

    下面是 2018 年的时候,《Go 夜读》上关于 sync.Pool 的分享,关于适用场景:

    当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大。形成 “并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环。

    在这个时候,需要有⼀个对象池,每个 goroutine 不再⾃⼰单独创建对象,⽽是从对象池中获取出⼀个对象(如果池中已经有的话)。

    因此关键思想就是对象的复用,避免重复创建、销毁,下面我们来看看如何使用。

    简单的例子

    首先来看一个简单的例子:

    package main
    import (
    	"fmt"
    	"sync"
    )
    
    var pool *sync.Pool
    
    type Person struct {
    	Name string
    }
    
    func initPool() {
    	pool = &sync.Pool {
    		New: func()interface{} {
    			fmt.Println("Creating a new Person")
    			return new(Person)
    		},
    	}
    }
    
    func main() {
    	initPool()
    
    	p := pool.Get().(*Person)
    	fmt.Println("首次从 pool 里获取:", p)
    
    	p.Name = "first"
    	fmt.Printf("设置 p.Name = %s\n", p.Name)
    
    	pool.Put(p)
    
    	fmt.Println("Pool 里已有一个对象:&{first},调用 Get: ", pool.Get().(*Person))
    	fmt.Println("Pool 没有对象了,调用 Get: ", pool.Get().(*Person))
    }

    运行结果:

    Creating a new Person
    首次从 pool 里获取: &{}
    设置 p.Name = first
    Pool 里已有一个对象:&{first},Get: &{first}
    Creating a new Person
    Pool 没有对象了,Get: &{}

    首先,需要初始化 Pool,唯一需要的就是设置好 New 函数。当调用 Get 方法时,如果池子里缓存了对象,就直接返回缓存的对象。如果没有存货,则调用 New 函数创建一个新的对象。

    另外,我们发现 Get 方法取出来的对象和上次 Put 进去的对象实际上是同一个,Pool 没有做任何“清空”的处理。但我们不应当对此有任何假设,因为在实际的并发使用场景中,无法保证这种顺序,最好的做法是在 Put 前,将对象清空。

    fmt 包如何用

    这部分主要看 fmt.Printf 如何使用:

    func Printf(format string, a ...interface{}) (n int, err error) {
    	return Fprintf(os.Stdout, format, a...)
    }

    继续看 Fprintf

    func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) {
    	p := newPrinter()
    	p.doPrintf(format, a)
    	n, err = w.Write(p.buf)
    	p.free()
    	return
    }

    Fprintf 函数的参数是一个 io.WriterPrintf 传的是 os.Stdout,相当于直接输出到标准输出。这里的 newPrinter 用的就是 Pool:

    // newPrinter allocates a new pp struct or grabs a cached one.
    func newPrinter() *pp {
    	p := ppFree.Get().(*pp)
    	p.panicking = false
    	p.erroring = false
    	p.wrapErrs = false
    	p.fmt.init(&p.buf)
    	return p
    }
    
    var ppFree = sync.Pool{
    	New: func() interface{} { return new(pp) },
    }

    回到 Fprintf 函数,拿到 pp 指针后,会做一些 format 的操作,并且将 p.buf 里面的内容写入 w。最后,调用 free 函数,将 pp 指针归还到 Pool 中:

    // free saves used pp structs in ppFree; avoids an allocation per invocation.
    func (p *pp) free() {
    	if cap(p.buf) > 64<<10 {
    		return
    	}
    
    	p.buf = p.buf[:0]
    	p.arg = nil
    	p.value = reflect.Value{}
    	p.wrappedErr = nil
    	ppFree.Put(p)
    }

    归还到 Pool 前将对象的一些字段清零,这样,通过 Get 拿到缓存的对象时,就可以安全地使用了。

    pool_test

    通过 test 文件学习源码是一个很好的途径,因为它代表了“官方”的用法。更重要的是,测试用例会故意测试一些“坑”,学习这些坑,也会让自己在使用的时候就能学会避免。

    pool_test 文件里共有 7 个测试,4 个 BechMark。

    TestPoolTestPoolNew 比较简单,主要是测试 Get/Put 的功能。我们来看下 TestPoolNew

    func TestPoolNew(t *testing.T) {
    	// disable GC so we can control when it happens.
    	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    
    	i := 0
    	p := Pool{
    		New: func() interface{} {
    			i++
    			return i
    		},
    	}
    	if v := p.Get(); v != 1 {
    		t.Fatalf("got %v; want 1", v)
    	}
    	if v := p.Get(); v != 2 {
    		t.Fatalf("got %v; want 2", v)
    	}
    
    	// Make sure that the goroutine doesn't migrate to another P
    	// between Put and Get calls.
    	Runtime_procPin()
    	p.Put(42)
    	if v := p.Get(); v != 42 {
    		t.Fatalf("got %v; want 42", v)
    	}
    	Runtime_procUnpin()
    
    	if v := p.Get(); v != 3 {
    		t.Fatalf("got %v; want 3", v)
    	}
    }

    首先设置了 GC=-1,作用就是停止 GC。那为啥要用 defer?函数都跑完了,还要 defer 干啥。注意到,debug.SetGCPercent 这个函数被调用了两次,而且这个函数返回的是上一次 GC 的值。因此,defer 在这里的用途是还原到调用此函数之前的 GC 设置,也就是恢复现场。

    接着,调置了 Pool 的 New 函数:直接返回一个 int,变且每次调用 New,都会自增 1。然后,连续调用了两次 Get 函数,因为这个时候 Pool 里没有缓存的对象,因此每次都会调用 New 创建一个,所以第一次返回 1,第二次返回 2。

    然后,调用 Runtime_procPin() 防止 goroutine 被强占,目的是保护接下来的一次 Put 和 Get 操作,使得它们操作的对象都是同一个 P 的“池子”。并且,这次调用 Get 的时候并没有调用 New,因为之前有一次 Put 的操作。

    最后,再次调用 Get 操作,因为没有“存货”,因此还是会再次调用 New 创建一个对象。

    TestPoolGCTestPoolRelease 则主要测试 GC 对 Pool 里对象的影响。这里用了一个函数,用于计数有多少对象会被 GC 回收:

    runtime.SetFinalizer(v, func(vv *string) {
    	atomic.AddUint32(&fin, 1)
    })

    当垃圾回收检测到 v 是一个不可达的对象时,并且 v 又有一个关联的 Finalizer,就会另起一个 goroutine 调用设置的 finalizer 函数,也就是上面代码里的参数 func。这样,就会让对象 v 重新可达,从而在这次 GC 过程中不被回收。之后,解绑对象 v 和它所关联的 Finalizer,当下次 GC 再次检测到对象 v 不可达时,才会被回收。

    TestPoolStress 从名字看,主要是想测一下“压力”,具体操作就是起了 10 个 goroutine 不断地向 Pool 里 Put 对象,然后又 Get 对象,看是否会出错。

    TestPoolDequeueTestPoolChain,都调用了 testPoolDequeue,这是具体干活的。它需要传入一个 PoolDequeue 接口:

    // poolDequeue testing.
    type PoolDequeue interface {
    	PushHead(val interface{}) bool
    	PopHead() (interface{}, bool)
    	PopTail() (interface{}, bool)
    }

    PoolDequeue 是一个双端队列,可以从头部入队元素,从头部和尾部出队元素。调用函数时,前者传入 NewPoolDequeue(16),后者传入 NewPoolChain(),底层其实都是 poolDequeue 这个结构体。具体来看 testPoolDequeue 做了什么:

    总共起了 10 个 goroutine:1 个生产者,9 个消费者。生产者不断地从队列头 pushHead 元素到双端队列里去,并且每 push 10 次,就 popHead 一次;消费者则一直从队列尾取元素。不论是从队列头还是从队列尾取元素,都会在 map 里做标记,最后检验每个元素是不是只被取出过一次。

    剩下的就是 Benchmark 测试了。第一个 BenchmarkPool 比较简单,就是不停地 Put/Get,测试性能。

    BenchmarkPoolSTW 函数会先关掉 GC,再向 pool 里 put 10 个对象,然后强制触发 GC,记录 GC 的停顿时间,并且做一个排序,计算 P50 和 P95 的 STW 时间。这个函数可以加入个人的代码库了:

    func BenchmarkPoolSTW(b *testing.B) {
    	// Take control of GC.
    	defer debug.SetGCPercent(debug.SetGCPercent(-1))
    
    	var mstats runtime.MemStats
    	var pauses []uint64
    
    	var p Pool
    	for i := 0; i < b.N; i++ {
    		// Put a large number of items into a pool.
    		const N = 100000
    		var item interface{} = 42
    		for i := 0; i < N; i++ {
    			p.Put(item)
    		}
    		// Do a GC.
    		runtime.GC()
    		// Record pause time.
    		runtime.ReadMemStats(&mstats)
    		pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
    	}
    
    	// Get pause time stats.
    	sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] })
    	var total uint64
    	for _, ns := range pauses {
    		total += ns
    	}
    	// ns/op for this benchmark is average STW time.
    	b.ReportMetric(float64(total)/float64(b.N), "ns/op")
    	b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
    	b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
    }

    我在 mac 上跑了一下:

    go test -v -run=none -bench=BenchmarkPoolSTW

    得到输出:

    goos: darwin
    goarch: amd64
    pkg: sync
    BenchmarkPoolSTW-12 361 3708 ns/op 3583 p50-ns/STW 5008 p95-ns/STW
    PASS
    ok sync 1.481s

    最后一个 BenchmarkPoolExpensiveNew 测试当 New 的代价很高时,Pool 的表现。也可以加入个人的代码库。

    其他

    标准库中 encoding/json 也用到了 sync.Pool 来提升性能。著名的 gin 框架,对 context 取用也到了 sync.Pool

    来看下 gin 如何使用 sync.Pool。设置 New 函数:

    engine.pool.New = func() interface{} {
    	return engine.allocateContext()
    }
    
    func (engine *Engine) allocateContext() *Context {
    	return &Context{engine: engine, KeysMutex: &sync.RWMutex{}}
    }

    使用:

    // ServeHTTP conforms to the http.Handler interface.
    func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    	c := engine.pool.Get().(*Context)
    	c.writermem.reset(w)
    	c.Request = req
    	c.reset()
    
    	engine.handleHTTPRequest(c)
    
    	engine.pool.Put(c)
    }

    先调用 Get 取出来缓存的对象,然后会做一些 reset 操作,再执行 handleHTTPRequest,最后再 Put 回 Pool。

    另外,Echo 框架也使⽤了 sync.Pool 来管理 context,并且⼏乎达到了零堆内存分配:

    It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.

    源码分析

    Pool 结构体

    首先来看 Pool 的结构体:

    type Pool struct {
    	noCopy noCopy
    
     // 每个 P 的本地队列,实际类型为 [P]poolLocal
    	local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    	// [P]poolLocal的大小
    	localSize uintptr // size of the local array
    
    	victim unsafe.Pointer // local from previous cycle
    	victimSize uintptr // size of victims array
    
    	// 自定义的对象创建回调函数,当 pool 中无可用对象时会调用此函数
    	New func() interface{}
    }

    因为 Pool 不希望被复制,所以结构体里有一个 noCopy 的字段,使用 go vet 工具可以检测到用户代码是否复制了 Pool。

    noCopy 是 go1.7 开始引入的一个静态检查机制。它不仅仅工作在运行时或标准库,同时也对用户代码有效。

    用户只需实现这样的不消耗内存、仅用于静态分析的结构,来保证一个对象在第一次使用后不会发生复制。

    实现非常简单:

    // noCopy 用于嵌入一个结构体中来保证其第一次使用后不会被复制
    //
    // 见 https://golang.org/issues/8005#issuecomment-190753527
    type noCopy struct{}
    
    // Lock 是一个空操作用来给 `go ve` 的 -copylocks 静态分析
    func (*noCopy) Lock() {}
    func (*noCopy) Unlock() {}

    local 字段存储指向 [P]poolLocal 数组(严格来说,它是一个切片)的指针,localSize 则表示 local 数组的大小。访问时,P 的 id 对应 [P]poolLocal 下标索引。通过这样的设计,多个 goroutine 使用同一个 Pool 时,减少了竞争,提升了性能。

    在一轮 GC 到来时,victim 和 victimSize 会分别“接管” local 和 localSize。victim 的机制用于减少 GC 后冷启动导致的性能抖动,让分配对象更平滑。

    Victim Cache 本来是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入的意图在于降低 GC 压力的同时提高命中率。

    当 Pool 没有缓存的对象时,调用 New 方法生成一个新的对象。

    type poolLocal struct {
    	poolLocalInternal
    
    	// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
    	// 每个缓存行具有 64 bytes,即 512 bit
    	// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
    	// 伪共享,仅占位用,防止在 cache line 上分配多个 poolLocalInternal
    	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    }
    
    // Local per-P Pool appendix.
    type poolLocalInternal struct {
     // P 的私有缓存区,使用时无需要加锁
    	private interface{}
    	// 公共缓存区。本地 P 可以 pushHead/popHead;其他 P 则只能 popTail
    	shared poolChain
    }

    字段 pad 主要是防止 false sharing,董大的《什么是 cpu cache》里讲得比较好:

    现代 cpu 中,cache 都划分成以 cache line (cache block) 为单位,在 x86_64 体系下一般都是 64 字节,cache line 是操作的最小单元。

    程序即使只想读内存中的 1 个字节数据,也要同时把附近 63 节字加载到 cache 中,如果读取超个 64 字节,那么就要加载到多个 cache line 中。

    简单来说,如果没有 pad 字段,那么当需要访问 0 号索引的 poolLocal 时,CPU 同时会把 0 号和 1 号索引同时加载到 cpu cache。在只修改 0 号索引的情况下,会让 1 号索引的 poolLocal 失效。这样,当其他线程想要读取 1 号索引时,发生 cache miss,还得重新再加载,对性能有损。增加一个 pad,补齐缓存行,让相关的字段能独立地加载到缓存行就不会出现 false sharding 了。

    poolChain 是一个双端队列的实现:

    type poolChain struct {
    	// 只有生产者会 push to,不用加锁
    	head *poolChainElt
    
    	// 读写需要原子控制。 pop from
    	tail *poolChainElt
    }
    
    type poolChainElt struct {
    	poolDequeue
    
    	// next 被 producer 写,consumer 读。所以只会从 nil 变成 non-nil
    	// prev 被 consumer 写,producer 读。所以只会从 non-nil 变成 nil
    	next, prev *poolChainElt
    }
    
    type poolDequeue struct {
    	// The head index is stored in the most-significant bits so
    	// that we can atomically add to it and the overflow is
    	// harmless.
    	// headTail 包含一个 32 位的 head 和一个 32 位的 tail 指针。这两个值都和 len(vals)-1 取模过。
    	// tail 是队列中最老的数据,head 指向下一个将要填充的 slot
     // slots 的有效范围是 [tail, head),由 consumers 持有。
    	headTail uint64
    
    	// vals 是一个存储 interface{} 的环形队列,它的 size 必须是 2 的幂
    	// 如果 slot 为空,则 vals[i].typ 为空;否则,非空。
    	// 一个 slot 在这时宣告无效:tail 不指向它了,vals[i].typ 为 nil
    	// 由 consumer 设置成 nil,由 producer 读
    	vals []eface
    }

    poolDequeue 被实现为单生产者、多消费者的固定大小的无锁(atomic 实现) Ring 式队列(底层存储使用数组,使用两个指针标记 head、tail)。生产者可以从 head 插入、head 删除,而消费者仅可从 tail 删除。

    headTail 指向队列的头和尾,通过位运算将 head 和 tail 存入 headTail 变量中。

    我们用一幅图来完整地描述 Pool 结构体:

    结合木白的技术私厨的《请问sync.Pool有什么缺点?》里的一张图,对于双端队列的理解会更容易一些:

    我们看到 Pool 并没有直接使用 poolDequeue,原因是它的大小是固定的,而 Pool 的大小是没有限制的。因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长。

    Get

    直接上源码:

    func (p *Pool) Get() interface{} {
     // ......
    	l, pid := p.pin()
    	x := l.private
    	l.private = nil
    	if x == nil {
    		x, _ = l.shared.popHead()
    		if x == nil {
    			x = p.getSlow(pid)
    		}
    	}
    	runtime_procUnpin()
     // ......
    	if x == nil && p.New != nil {
    		x = p.New()
    	}
    	return x
    }

    省略号的内容是 race 相关的,属于阅读源码过程中的一些噪音,暂时注释掉。这样,Get 的整个过程就非常清晰了:

    1. 首先,调用 p.pin() 函数将当前的 goroutine 和 P 绑定,禁止被抢占,返回当前 P 对应的 poolLocal,以及 pid。
    2. 然后直接取 l.private,赋值给 x,并置 l.private 为 nil。
    3. 判断 x 是否为空,若为空,则尝试从 l.shared 的头部 pop 一个对象出来,同时赋值给 x。
    4. 如果 x 仍然为空,则调用 getSlow 尝试从其他 P 的 shared 双端队列尾部“偷”一个对象出来。
    5. Pool 的相关操作做完了,调用 runtime_procUnpin() 解除非抢占。
    6. 最后如果还是没有取到缓存的对象,那就直接调用预先设置好的 New 函数,创建一个出来。

    我用一张流程图来展示整个过程:

    整体流程梳理完了,我们再来看一下其中的一些关键函数。

    pin

    先来看 Pool.pin()

    // src/sync/pool.go
    
    // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消抢占。
    func (p *Pool) pin() (*poolLocal, int) {
    	pid := runtime_procPin()
    	s := atomic.LoadUintptr(&p.localSize) // load-acquire
    	l := p.local    // load-consume
    	// 因为可能存在动态的 P(运行时调整 P 的个数)
    	if uintptr(pid) < s {
    		return indexLocal(l, pid), pid
    	}
    	return p.pinSlow()
    }

    pin 的作用就是将当前 groutine 和 P 绑定在一起,禁止抢占。并且返回对应的 poolLocal 以及 P 的 id。

    如果 G 被抢占,则 G 的状态从 running 变成 runnable,会被放回 P 的 localq 或 globaq,等待下一次调度。下次再执行时,就不一定是和现在的 P 相结合了。因为之后会用到 pid,如果被抢占了,有可能接下来使用的 pid 与所绑定的 P 并非同一个。

    “绑定”的任务最终交给了 procPin

    // src/runtime/proc.go
    
    func procPin() int {
    	_g_ := getg()
    	mp := _g_.m
    
    	mp.locks++
    	return int(mp.p.ptr().id)
    }

    实现的代码很简洁:将当前 goroutine 绑定的 m 上的一个锁字段 locks 值加 1,即完成了“绑定”。关于 pin 的原理,可以参考《golang的对象池sync.pool源码解读》,文章详细分析了为什么执行 procPin 之后,不可抢占,且 GC 不会清扫 Pool 里的对象。

    我们再回到 p.pin(),原子操作取出 p.localSizep.local,如果当前 pid 小于 p.localSize,则直接取 poolLocal 数组中的 pid 索引处的元素。否则,说明 Pool 还没有创建 poolLocal,调用 p.pinSlow() 完成创建工作。

    func (p *Pool) pinSlow() (*poolLocal, int) {
    	// Retry under the mutex.
    	// Can not lock the mutex while pinned.
    	runtime_procUnpin()
    	allPoolsMu.Lock()
    	defer allPoolsMu.Unlock()
    	pid := runtime_procPin()
    	// poolCleanup won't be called while we are pinned.
    	// 没有使用原子操作,因为已经加了全局锁了
    	s := p.localSize
    	l := p.local
    	// 因为 pinSlow 中途可能已经被其他的线程调用,因此这时候需要再次对 pid 进行检查。 如果 pid 在 p.local 大小范围内,则不用创建 poolLocal 切片,直接返回。
    	if uintptr(pid) < s {
    		return indexLocal(l, pid), pid
    	}
    	if p.local == nil {
    		allPools = append(allPools, p)
    	}
    	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    	// 当前 P 的数量
    	size := runtime.GOMAXPROCS(0)
    	local := make([]poolLocal, size)
    	// 旧的 local 会被回收
    	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    	atomic.StoreUintptr(&p.localSize, uintptr(size))  // store-release
    	return &local[pid], pid
    }

    因为要上一把大锁 allPoolsMu,所以函数名带有 slow。我们知道,锁粒度越大,竞争越多,自然就越“slow”。不过要想上锁的话,得先解除“绑定”,锁上之后,再执行“绑定”。原因是锁越大,被阻塞的概率就越大,如果还占着 P,那就浪费资源。

    在解除绑定后,pinSlow 可能被其他的线程调用过了,p.local 可能会发生变化。因此这时候需要再次对 pid 进行检查。如果 pid 在 p.localSize 大小范围内,则不用再创建 poolLocal 切片,直接返回。

    之后,根据 P 的个数,使用 make 创建切片,包含 runtime.GOMAXPROCS(0) 个 poolLocal,并且使用原子操作设置 p.local 和 p.localSize。

    最后,返回 p.local 对应 pid 索引处的元素。

    关于这把大锁 allPoolsMu,曹大在《几个 Go 系统可能遇到的锁问题》里讲了一个例子。第三方库用了 sync.Pool,内部有一个结构体 fasttemplate.Template,包含 sync.Pool 字段。而 rd 在使用时,每个请求都会新建这样一个结构体。于是,处理每个请求时,都会尝试从一个空的 Pool 里取缓存的对象,最后 goroutine 都阻塞在了这把大锁上,因为都在尝试执行:allPools = append(allPools, p),从而造成性能问题。

    popHead

    回到 Get 函数,再来看另一个关键的函数:poolChain.popHead()

    func (c *poolChain) popHead() (interface{}, bool) {
    	d := c.head
    	for d != nil {
    		if val, ok := d.popHead(); ok {
    			return val, ok
    		}
    		// There may still be unconsumed elements in the
    		// previous dequeue, so try backing up.
    		d = loadPoolChainElt(&d.prev)
    	}
    	return nil, false
    }

    popHead 函数只会被 producer 调用。首先拿到头节点:c.head,如果头节点不为空的话,尝试调用头节点的 popHead 方法。注意这两个 popHead 方法实际上并不相同,一个是 poolChain 的,一个是 poolDequeue 的,有疑惑的,不妨回头再看一下 Pool 结构体的图。我们来看 poolDequeue.popHead()

    // /usr/local/go/src/sync/poolqueue.go
    
    func (d *poolDequeue) popHead() (interface{}, bool) {
    	var slot *eface
    	for {
    		ptrs := atomic.LoadUint64(&d.headTail)
    		head, tail := d.unpack(ptrs)
    		// 判断队列是否为空
    		if tail == head {
    			// Queue is empty.
    			return nil, false
    		}
    
    		// head 位置是队头的前一个位置,所以此处要先退一位。
    		// 在读出 slot 的 value 之前就把 head 值减 1,取消对这个 slot 的控制
    		head--
    		ptrs2 := d.pack(head, tail)
    		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
    			// We successfully took back slot.
    			slot = &d.vals[head&uint32(len(d.vals)-1)]
    			break
    		}
    	}
    
     // 取出 val
    	val := *(*interface{})(unsafe.Pointer(slot))
    	if val == dequeueNil(nil) {
    		val = nil
    	}
    	
    	// 重置 slot,typ 和 val 均为 nil
    	// 这里清空的方式与 popTail 不同,与 pushHead 没有竞争关系,所以不用太小心
    	*slot = eface{}
    	return val, true
    }

    此函数会删掉并且返回 queue 的头节点。但如果 queue 为空的话,返回 false。这里的 queue 存储的实际上就是 Pool 里缓存的对象。

    整个函数的核心是一个无限循环,这是 Go 中常用的无锁化编程形式。

    首先调用 unpack 函数分离出 head 和 tail 指针,如果 head 和 tail 相等,即首尾相等,那么这个队列就是空的,直接就返回 nil,false

    否则,将 head 指针后移一位,即 head 值减 1,然后调用 pack 打包 head 和 tail 指针。使用 atomic.CompareAndSwapUint64 比较 headTail 在这之间是否有变化,如果没变化,相当于获取到了这把锁,那就更新 headTail 的值。并且把 vals 相应索引处的元素赋值给 slot。

    因为 vals 长度实际是只能是 2 的 n 次幂,因此 len(d.vals)-1 实际上得到的值的低 n 位是全 1,它再与 head 相与,实际就是取 head 低 n 位的值。

    得到相应 slot 的元素后,经过类型转换并判断是否是 dequeueNil,如果是,说明没取到缓存的对象,返回 nil。

    // /usr/local/go/src/sync/poolqueue.go
    // 因为使用 nil 代表空的 slots,因此使用 dequeueNil 表示 interface{}(nil)
    type dequeueNil *struct{}

    最后,返回 val 之前,将 slot “归零”:*slot = eface{}

    回到 poolChain.popHead(),调用 poolDequeue.popHead() 拿到缓存的对象后,直接返回。否则,将 d 重新指向 d.prev,继续尝试获取缓存的对象。

    getSlow

    如果在 shared 里没有获取到缓存对象,则继续调用 Pool.getSlow(),尝试从其他 P 的 poolLocal 偷取:

    func (p *Pool) getSlow(pid int) interface{} {
    	// See the comment in pin regarding ordering of the loads.
    	size := atomic.LoadUintptr(&p.localSize) // load-acquire
    	locals := p.local   // load-consume
    	// Try to steal one element from other procs.
    	// 从其他 P 中窃取对象
    	for i := 0; i < int(size); i++ {
    		l := indexLocal(locals, (pid+i+1)%int(size))
    		if x, _ := l.shared.popTail(); x != nil {
    			return x
    		}
    	}
    
    	// 尝试从victim cache中取对象。这发生在尝试从其他 P 的 poolLocal 偷去失败后,
    	// 因为这样可以使 victim 中的对象更容易被回收。
    	size = atomic.LoadUintptr(&p.victimSize)
    	if uintptr(pid) >= size {
    		return nil
    	}
    	locals = p.victim
    	l := indexLocal(locals, pid)
    	if x := l.private; x != nil {
    		l.private = nil
    		return x
    	}
    	for i := 0; i < int(size); i++ {
    		l := indexLocal(locals, (pid+i)%int(size))
    		if x, _ := l.shared.popTail(); x != nil {
    			return x
    		}
    	}
    
    	// 清空 victim cache。下次就不用再从这里找了
    	atomic.StoreUintptr(&p.victimSize, 0)
    
    	return nil
    }

    从索引为 pid+1 的 poolLocal 处开始,尝试调用 shared.popTail() 获取缓存对象。如果没有拿到,则从 victim 里找,和 poolLocal 的逻辑类似。

    最后,实在没找到,就把 victimSize 置 0,防止后来的“人”再到 victim 里找。

    在 Get 函数的最后,经过这一番操作还是没找到缓存的对象,就调用 New 函数创建一个新的对象。

    popTail

    最后,还剩一个 popTail 函数:

    func (c *poolChain) popTail() (interface{}, bool) {
    	d := loadPoolChainElt(&c.tail)
    	if d == nil {
    		return nil, false
    	}
    
    	for {
    		d2 := loadPoolChainElt(&d.next)
    
    		if val, ok := d.popTail(); ok {
    			return val, ok
    		}
    
    		if d2 == nil {
    			// 双向链表只有一个尾节点,现在为空
    			return nil, false
    		}
    
    		// 双向链表的尾节点里的双端队列被“掏空”,所以继续看下一个节点。
    		// 并且由于尾节点已经被“掏空”,所以要甩掉它。这样,下次 popHead 就不会再看它有没有缓存对象了。
    		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
    			// 甩掉尾节点
    			storePoolChainElt(&d2.prev, nil)
    		}
    		d = d2
    	}
    }

    for 循环的一开始,就把 d.next 加载到了 d2。因为 d 可能会短暂为空,但如果 d2 在 pop 或者 pop fails 之前就不为空的话,说明 d 就会永久为空了。在这种情况下,可以安全地将 d 这个结点“甩掉”。

    最后,将 c.tail 更新为 d2,可以防止下次 popTail 的时候查看一个空的 dequeue;而将 d2.prev 设置为 nil,可以防止下次 popHead 时查看一个空的 dequeue

    我们再看一下核心的 poolDequeue.popTail

    // src/sync/poolqueue.go:147
    
    func (d *poolDequeue) popTail() (interface{}, bool) {
    	var slot *eface
    	for {
    		ptrs := atomic.LoadUint64(&d.headTail)
    		head, tail := d.unpack(ptrs)
    		// 判断队列是否空
    		if tail == head {
    			// Queue is empty.
    			return nil, false
    		}
    
    		// 先搞定 head 和 tail 指针位置。如果搞定,那么这个 slot 就归属我们了
    		ptrs2 := d.pack(head, tail+1)
    		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
    			// Success.
    			slot = &d.vals[tail&uint32(len(d.vals)-1)]
    			break
    		}
    	}
    
    	// We now own slot.
    	val := *(*interface{})(unsafe.Pointer(slot))
    	if val == dequeueNil(nil) {
    		val = nil
    	}
    
    	slot.val = nil
    	atomic.StorePointer(&slot.typ, nil)
    	// At this point pushHead owns the slot.
    
    	return val, true
    }

    popTail 从队列尾部移除一个元素,如果队列为空,返回 false。此函数可能同时被多个消费者调用。

    函数的核心是一个无限循环,又是一个无锁编程。先解出 head,tail 指针值,如果两者相等,说明队列为空。

    因为要从尾部移除一个元素,所以 tail 指针前进 1,然后使用原子操作设置 headTail。

    最后,将要移除的 slot 的 val 和 typ “归零”:

    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)

    Put

    // src/sync/pool.go
    
    // Put 将对象添加到 Pool 
    func (p *Pool) Put(x interface{}) {
    	if x == nil {
    		return
    	}
    	// ……
    	l, _ := p.pin()
    	if l.private == nil {
    		l.private = x
    		x = nil
    	}
    	if x != nil {
    		l.shared.pushHead(x)
    	}
    	runtime_procUnpin()
     //…… 
    }

    同样删掉了 race 相关的函数,看起来清爽多了。整个 Put 的逻辑也很清晰:

    • 先绑定 g 和 P,然后尝试将 x 赋值给 private 字段。
    • 如果失败,就调用 pushHead 方法尝试将其放入 shared 字段所维护的双端队列中。

    同样用流程图来展示整个过程:

    pushHead

    我们来看 pushHead 的源码,比较清晰:

    // src/sync/poolqueue.go
    
    func (c *poolChain) pushHead(val interface{}) {
    	d := c.head
    	if d == nil {
    		// poolDequeue 初始长度为8
    		const initSize = 8 // Must be a power of 2
    		d = new(poolChainElt)
    		d.vals = make([]eface, initSize)
    		c.head = d
    		storePoolChainElt(&c.tail, d)
    	}
    
    	if d.pushHead(val) {
    		return
    	}
    
     // 前一个 poolDequeue 长度的 2 倍
    	newSize := len(d.vals) * 2
    	if newSize >= dequeueLimit {
    		// Can't make it any bigger.
    		newSize = dequeueLimit
    	}
    
     // 首尾相连,构成链表
    	d2 := &poolChainElt{prev: d}
    	d2.vals = make([]eface, newSize)
    	c.head = d2
    	storePoolChainElt(&d.next, d2)
    	d2.pushHead(val)
    }