Source File
poolqueue.go
Belonging Package
sync
// Copyright 2019 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package syncimport ()// poolDequeue is a lock-free fixed-size single-producer,// multi-consumer queue. The single producer can both push and pop// from the head, and consumers can pop from the tail.//// It has the added feature that it nils out unused slots to avoid// unnecessary retention of objects. This is important for sync.Pool,// but not typically a property considered in the literature.type poolDequeue struct {// headTail packs together a 32-bit head index and a 32-bit// tail index. Both are indexes into vals modulo len(vals)-1.//// tail = index of oldest data in queue// head = index of next slot to fill//// Slots in the range [tail, head) are owned by consumers.// A consumer continues to own a slot outside this range until// it nils the slot, at which point ownership passes to the// producer.//// The head index is stored in the most-significant bits so// that we can atomically add to it and the overflow is// harmless.headTail uint64// vals is a ring buffer of interface{} values stored in this// dequeue. The size of this must be a power of 2.//// vals[i].typ is nil if the slot is empty and non-nil// otherwise. A slot is still in use until *both* the tail// index has moved beyond it and typ has been set to nil. This// is set to nil atomically by the consumer and read// atomically by the producer.vals []eface}type eface struct {typ, val unsafe.Pointer}const dequeueBits = 32// dequeueLimit is the maximum size of a poolDequeue.//// This must be at most (1<<dequeueBits)/2 because detecting fullness// depends on wrapping around the ring buffer without wrapping around// the index. We divide by 4 so this fits in an int on 32-bit.const dequeueLimit = (1 << dequeueBits) / 4// dequeueNil is used in poolDequeue to represent interface{}(nil).// Since we use nil to represent empty slots, we need a sentinel value// to represent nil.type dequeueNil *struct{}func ( *poolDequeue) ( uint64) (, uint32) {const = 1<<dequeueBits - 1= uint32(( >> dequeueBits) & )= uint32( & )return}func ( *poolDequeue) (, uint32) uint64 {const = 1<<dequeueBits - 1return (uint64() << dequeueBits) |uint64(&)}// pushHead adds val at the head of the queue. It returns false if the// queue is full. It must only be called by a single producer.func ( *poolDequeue) ( interface{}) bool {:= atomic.LoadUint64(&.headTail), := .unpack()if (+uint32(len(.vals)))&(1<<dequeueBits-1) == {// Queue is full.return false}:= &.vals[&uint32(len(.vals)-1)]// Check if the head slot has been released by popTail.:= atomic.LoadPointer(&.typ)if != nil {// Another goroutine is still cleaning up the tail, so// the queue is actually still full.return false}// The head slot is free, so we own it.if == nil {= dequeueNil(nil)}*(*interface{})(unsafe.Pointer()) =// Increment head. This passes ownership of slot to popTail// and acts as a store barrier for writing the slot.atomic.AddUint64(&.headTail, 1<<dequeueBits)return true}// popHead removes and returns the element at the head of the queue.// It returns false if the queue is empty. It must only be called by a// single producer.func ( *poolDequeue) () (interface{}, bool) {var *efacefor {:= atomic.LoadUint64(&.headTail), := .unpack()if == {// Queue is empty.return nil, false}// Confirm tail and decrement head. We do this before// reading the value to take back ownership of this// slot.--:= .pack(, )if atomic.CompareAndSwapUint64(&.headTail, , ) {// We successfully took back slot.= &.vals[&uint32(len(.vals)-1)]break}}:= *(*interface{})(unsafe.Pointer())if == dequeueNil(nil) {= nil}// Zero the slot. Unlike popTail, this isn't racing with// pushHead, so we don't need to be careful here.* = eface{}return , true}// popTail removes and returns the element at the tail of the queue.// It returns false if the queue is empty. It may be called by any// number of consumers.func ( *poolDequeue) () (interface{}, bool) {var *efacefor {:= atomic.LoadUint64(&.headTail), := .unpack()if == {// Queue is empty.return nil, false}// Confirm head and tail (for our speculative check// above) and increment tail. If this succeeds, then// we own the slot at tail.:= .pack(, +1)if atomic.CompareAndSwapUint64(&.headTail, , ) {// Success.= &.vals[&uint32(len(.vals)-1)]break}}// We now own slot.:= *(*interface{})(unsafe.Pointer())if == dequeueNil(nil) {= nil}// Tell pushHead that we're done with this slot. Zeroing the// slot is also important so we don't leave behind references// that could keep this object live longer than necessary.//// We write to val first and then publish that we're done with// this slot by atomically writing to typ..val = nilatomic.StorePointer(&.typ, nil)// At this point pushHead owns the slot.return , true}// poolChain is a dynamically-sized version of poolDequeue.//// This is implemented as a doubly-linked list queue of poolDequeues// where each dequeue is double the size of the previous one. Once a// dequeue fills up, this allocates a new one and only ever pushes to// the latest dequeue. Pops happen from the other end of the list and// once a dequeue is exhausted, it gets removed from the list.type poolChain struct {// head is the poolDequeue to push to. This is only accessed// by the producer, so doesn't need to be synchronized.head *poolChainElt// tail is the poolDequeue to popTail from. This is accessed// by consumers, so reads and writes must be atomic.tail *poolChainElt}type poolChainElt struct {poolDequeue// next and prev link to the adjacent poolChainElts in this// poolChain.//// next is written atomically by the producer and read// atomically by the consumer. It only transitions from nil to// non-nil.//// prev is written atomically by the consumer and read// atomically by the producer. It only transitions from// non-nil to nil.next, prev *poolChainElt}func ( **poolChainElt, *poolChainElt) {atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer()), unsafe.Pointer())}func ( **poolChainElt) *poolChainElt {return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer())))}func ( *poolChain) ( interface{}) {:= .headif == nil {// Initialize the chain.const = 8 // Must be a power of 2= new(poolChainElt).vals = make([]eface, ).head =storePoolChainElt(&.tail, )}if .pushHead() {return}// The current dequeue is full. Allocate a new one of twice// the size.:= len(.vals) * 2if >= dequeueLimit {// Can't make it any bigger.= dequeueLimit}:= &poolChainElt{prev: }.vals = make([]eface, ).head =storePoolChainElt(&.next, ).pushHead()}func ( *poolChain) () (interface{}, bool) {:= .headfor != nil {if , := .popHead(); {return ,}// There may still be unconsumed elements in the// previous dequeue, so try backing up.= loadPoolChainElt(&.prev)}return nil, false}func ( *poolChain) () (interface{}, bool) {:= loadPoolChainElt(&.tail)if == nil {return nil, false}for {// It's important that we load the next pointer// *before* popping the tail. In general, d may be// transiently empty, but if next is non-nil before// the pop and the pop fails, then d is permanently// empty, which is the only condition under which it's// safe to drop d from the chain.:= loadPoolChainElt(&.next)if , := .popTail(); {return ,}if == nil {// This is the only dequeue. It's empty right// now, but could be pushed to in the future.return nil, false}// The tail of the chain has been drained, so move on// to the next dequeue. Try to drop it from the chain// so the next pop doesn't have to look at the empty// dequeue again.if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&.tail)), unsafe.Pointer(), unsafe.Pointer()) {// We won the race. Clear the prev pointer so// the garbage collector can collect the empty// dequeue and so popHead doesn't back up// further than necessary.storePoolChainElt(&.prev, nil)}=}}