blob: 86bd77d4a2e679913b6c96b3c82738fa4ced7a56 [file] [log] [blame]
// +build !appengine
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package buffer
import (
"fmt"
"sync"
"testing"
"time"
"google.golang.org/grpc/internal/grpctest"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestCircularBufferSerial(t *testing.T) {
var size, i uint32
var result []interface{}
size = 1 << 15
cb, err := NewCircularBuffer(size)
if err != nil {
t.Fatalf("error allocating CircularBuffer: %v", err)
}
for i = 0; i < size/2; i++ {
cb.Push(i)
}
result = cb.Drain()
if uint32(len(result)) != size/2 {
t.Fatalf("len(result) = %d; want %d", len(result), size/2)
}
// The returned result isn't necessarily sorted.
seen := make(map[uint32]bool)
for _, r := range result {
seen[r.(uint32)] = true
}
for i = 0; i < uint32(len(result)); i++ {
if !seen[i] {
t.Fatalf("seen[%d] = false; want true", i)
}
}
for i = 0; i < size; i++ {
cb.Push(i)
}
result = cb.Drain()
if uint32(len(result)) != size {
t.Fatalf("len(result) = %d; want %d", len(result), size/2)
}
}
func (s) TestCircularBufferOverflow(t *testing.T) {
var size, i uint32
var result []interface{}
size = 1 << 10
cb, err := NewCircularBuffer(size)
if err != nil {
t.Fatalf("error allocating CircularBuffer: %v", err)
}
for i = 0; i < 10*size; i++ {
cb.Push(i)
}
result = cb.Drain()
if uint32(len(result)) != size {
t.Fatalf("len(result) = %d; want %d", len(result), size)
}
for idx, x := range result {
if x.(uint32) < size {
t.Fatalf("result[%d] = %d; want it to be >= %d", idx, x, size)
}
}
}
func (s) TestCircularBufferConcurrent(t *testing.T) {
for tn := 0; tn < 2; tn++ {
var size uint32
var result []interface{}
size = 1 << 6
cb, err := NewCircularBuffer(size)
if err != nil {
t.Fatalf("error allocating CircularBuffer: %v", err)
}
type item struct {
R uint32
N uint32
T time.Time
}
var wg sync.WaitGroup
for r := uint32(0); r < 1024; r++ {
wg.Add(1)
go func(r uint32) {
for n := uint32(0); n < size; n++ {
cb.Push(item{R: r, N: n, T: time.Now()})
}
wg.Done()
}(r)
}
// Wait for all goroutines to finish only in one test. Draining
// concurrently while Pushes are still happening will test for races in the
// Draining lock.
if tn == 0 {
wg.Wait()
}
result = cb.Drain()
// Can't expect the buffer to be full if the Pushes aren't necessarily done.
if tn == 0 {
if uint32(len(result)) != size {
t.Fatalf("len(result) = %d; want %d", len(result), size)
}
}
// There can be absolutely no expectation on the order of the data returned
// by Drain because: (a) everything is happening concurrently (b) a
// round-robin is used to write to different queues (and therefore
// different cachelines) for less write contention.
// Wait for all goroutines to complete before moving on to other tests. If
// the benchmarks run after this, it might affect performance unfairly.
wg.Wait()
}
}
func BenchmarkCircularBuffer(b *testing.B) {
x := 1
for size := 1 << 16; size <= 1<<20; size <<= 1 {
for routines := 1; routines <= 1<<8; routines <<= 1 {
b.Run(fmt.Sprintf("goroutines:%d/size:%d", routines, size), func(b *testing.B) {
cb, err := NewCircularBuffer(uint32(size))
if err != nil {
b.Fatalf("error allocating CircularBuffer: %v", err)
}
perRoutine := b.N / routines
var wg sync.WaitGroup
for r := 0; r < routines; r++ {
wg.Add(1)
go func() {
for i := 0; i < perRoutine; i++ {
cb.Push(&x)
}
wg.Done()
}()
}
wg.Wait()
})
}
}
}