blob: 8bf89c901458781e3cbd0ec1d2f97652a967c0eb [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package buffer provides a high-performant lock free implementation of a
// circular buffer used by the profiling code.
package buffer
import (
"errors"
"math/bits"
"runtime"
"sync"
"sync/atomic"
"unsafe"
)
type queue struct {
// An array of pointers as references to the items stored in this queue.
arr []unsafe.Pointer
// The maximum number of elements this queue may store before it wraps around
// and overwrites older values. Must be an exponent of 2.
size uint32
// Always size - 1. A bitwise AND is performed with this mask in place of a
// modulo operation by the Push operation.
mask uint32
// Each Push operation into this queue increments the acquired counter before
// proceeding forwarding with the actual write to arr. This counter is also
// used by the Drain operation's drainWait subroutine to wait for all pushes
// to complete.
acquired uint32 // Accessed atomically.
// After the completion of a Push operation, the written counter is
// incremented. Also used by drainWait to wait for all pushes to complete.
written uint32
}
// Allocates and returns a new *queue. size needs to be a exponent of two.
func newQueue(size uint32) *queue {
return &queue{
arr: make([]unsafe.Pointer, size),
size: size,
mask: size - 1,
}
}
// drainWait blocks the caller until all Pushes on this queue are complete.
func (q *queue) drainWait() {
for atomic.LoadUint32(&q.acquired) != atomic.LoadUint32(&q.written) {
runtime.Gosched()
}
}
// A queuePair has two queues. At any given time, Pushes go into the queue
// referenced by queuePair.q. The active queue gets switched when there's a
// drain operation on the circular buffer.
type queuePair struct {
q0 unsafe.Pointer
q1 unsafe.Pointer
q unsafe.Pointer
}
// Allocates and returns a new *queuePair with its internal queues allocated.
func newQueuePair(size uint32) *queuePair {
qp := &queuePair{}
qp.q0 = unsafe.Pointer(newQueue(size))
qp.q1 = unsafe.Pointer(newQueue(size))
qp.q = qp.q0
return qp
}
// Switches the current queue for future Pushes to proceed to the other queue
// so that there's no blocking in Push. Returns a pointer to the old queue that
// was in place before the switch.
func (qp *queuePair) switchQueues() *queue {
// Even though we have mutual exclusion across drainers (thanks to mu.Lock in
// drain), Push operations may access qp.q whilst we're writing to it.
if atomic.CompareAndSwapPointer(&qp.q, qp.q0, qp.q1) {
return (*queue)(qp.q0)
}
atomic.CompareAndSwapPointer(&qp.q, qp.q1, qp.q0)
return (*queue)(qp.q1)
}
// In order to not have expensive modulo operations, we require the maximum
// number of elements in the circular buffer (N) to be an exponent of two to
// use a bitwise AND mask. Since a CircularBuffer is a collection of queuePairs
// (see below), we need to divide N; since exponents of two are only divisible
// by other exponents of two, we use floorCPUCount number of queuePairs within
// each CircularBuffer.
//
// Floor of the number of CPUs (and not the ceiling) was found to the be the
// optimal number through experiments.
func floorCPUCount() uint32 {
floorExponent := bits.Len32(uint32(runtime.NumCPU())) - 1
if floorExponent < 0 {
floorExponent = 0
}
return 1 << uint32(floorExponent)
}
var numCircularBufferPairs = floorCPUCount()
// CircularBuffer is a lock-free data structure that supports Push and Drain
// operations.
//
// Note that CircularBuffer is built for performance more than reliability.
// That is, some Push operations may fail without retries in some situations
// (such as during a Drain operation). Order of pushes is not maintained
// either; that is, if A was pushed before B, the Drain operation may return an
// array with B before A. These restrictions are acceptable within gRPC's
// profiling, but if your use-case does not permit these relaxed constraints
// or if performance is not a primary concern, you should probably use a
// lock-based data structure such as internal/buffer.UnboundedBuffer.
type CircularBuffer struct {
drainMutex sync.Mutex
qp []*queuePair
// qpn is an monotonically incrementing counter that's used to determine
// which queuePair a Push operation should write to. This approach's
// performance was found to be better than writing to a random queue.
qpn uint32
qpMask uint32
}
var errInvalidCircularBufferSize = errors.New("buffer size is not an exponent of two")
// NewCircularBuffer allocates a circular buffer of size size and returns a
// reference to the struct. Only circular buffers of size 2^k are allowed
// (saves us from having to do expensive modulo operations).
func NewCircularBuffer(size uint32) (*CircularBuffer, error) {
if size&(size-1) != 0 {
return nil, errInvalidCircularBufferSize
}
n := numCircularBufferPairs
if size/numCircularBufferPairs < 8 {
// If each circular buffer is going to hold less than a very small number
// of items (let's say 8), using multiple circular buffers is very likely
// wasteful. Instead, fallback to one circular buffer holding everything.
n = 1
}
cb := &CircularBuffer{
qp: make([]*queuePair, n),
qpMask: n - 1,
}
for i := uint32(0); i < n; i++ {
cb.qp[i] = newQueuePair(size / n)
}
return cb, nil
}
// Push pushes an element in to the circular buffer. Guaranteed to complete in
// a finite number of steps (also lock-free). Does not guarantee that push
// order will be retained. Does not guarantee that the operation will succeed
// if a Drain operation concurrently begins execution.
func (cb *CircularBuffer) Push(x interface{}) {
n := atomic.AddUint32(&cb.qpn, 1) & cb.qpMask
qptr := atomic.LoadPointer(&cb.qp[n].q)
q := (*queue)(qptr)
acquired := atomic.AddUint32(&q.acquired, 1) - 1
// If true, it means that we have incremented acquired before any queuePair
// was switched, and therefore before any drainWait completion. Therefore, it
// is safe to proceed with the Push operation on this queue. Otherwise, it
// means that a Drain operation has begun execution, but we don't know how
// far along the process it is. If it is past the drainWait check, it is not
// safe to proceed with the Push operation. We choose to drop this sample
// entirely instead of retrying, as retrying may potentially send the Push
// operation into a spin loop (we want to guarantee completion of the Push
// operation within a finite time). Before exiting, we increment written so
// that any existing drainWaits can proceed.
if atomic.LoadPointer(&cb.qp[n].q) != qptr {
atomic.AddUint32(&q.written, 1)
return
}
// At this point, we're definitely writing to the right queue. That is, one
// of the following is true:
// 1. No drainer is in execution on this queue.
// 2. A drainer is in execution on this queue and it is waiting at the
// acquired == written barrier.
//
// Let's say two Pushes A and B happen on the same queue. Say A and B are
// q.size apart; i.e. they get the same index. That is,
//
// index_A = index_B
// acquired_A + q.size = acquired_B
//
// We say "B has wrapped around A" when this happens. In this case, since A
// occurred before B, B's Push should be the final value. However, we
// accommodate A being the final value because wrap-arounds are extremely
// rare and accounting for them requires an additional counter and a
// significant performance penalty. Note that the below approach never leads
// to any data corruption.
index := acquired & q.mask
atomic.StorePointer(&q.arr[index], unsafe.Pointer(&x))
// Allows any drainWait checks to proceed.
atomic.AddUint32(&q.written, 1)
}
// Dereferences non-nil pointers from arr into result. Range of elements from
// arr that are copied is [from, to). Assumes that the result slice is already
// allocated and is large enough to hold all the elements that might be copied.
// Also assumes mutual exclusion on the array of pointers.
func dereferenceAppend(result []interface{}, arr []unsafe.Pointer, from, to uint32) []interface{} {
for i := from; i < to; i++ {
// We have mutual exclusion on arr, there's no need for atomics.
x := (*interface{})(arr[i])
if x != nil {
result = append(result, *x)
}
}
return result
}
// Drain allocates and returns an array of things Pushed in to the circular
// buffer. Push order is not maintained; that is, if B was Pushed after A,
// drain may return B at a lower index than A in the returned array.
func (cb *CircularBuffer) Drain() []interface{} {
cb.drainMutex.Lock()
qs := make([]*queue, len(cb.qp))
for i := 0; i < len(cb.qp); i++ {
qs[i] = cb.qp[i].switchQueues()
}
var wg sync.WaitGroup
wg.Add(len(qs))
for i := 0; i < len(qs); i++ {
go func(qi int) {
qs[qi].drainWait()
wg.Done()
}(i)
}
wg.Wait()
result := make([]interface{}, 0)
for i := 0; i < len(qs); i++ {
if acquired := atomic.LoadUint32(&qs[i].acquired); acquired < qs[i].size {
result = dereferenceAppend(result, qs[i].arr, 0, acquired)
} else {
result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size)
}
}
for i := 0; i < len(qs); i++ {
atomic.StoreUint32(&qs[i].acquired, 0)
atomic.StoreUint32(&qs[i].written, 0)
}
cb.drainMutex.Unlock()
return result
}