blob: db0737c987c245e30eb12989ac3f7734782e92ef [file] [log] [blame]
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipe
import (
"math/rand"
"reflect"
"runtime"
"sync"
"testing"
)
func TestSimpleReadWrite(t *testing.T) {
// Check that a simple write can be properly read from the rx side.
tr := rand.New(rand.NewSource(99))
rr := rand.New(rand.NewSource(99))
b := make([]byte, 100)
var tx Tx
tx.Init(b)
wb := tx.Push(10)
if wb == nil {
t.Fatalf("Push failed on empty pipe")
}
for i := range wb {
wb[i] = byte(tr.Intn(256))
}
tx.Flush()
var rx Rx
rx.Init(b)
rb := rx.Pull()
if len(rb) != 10 {
t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10)
}
for i := range rb {
if v := byte(rr.Intn(256)); v != rb[i] {
t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v)
}
}
rx.Flush()
}
func TestEmptyRead(t *testing.T) {
// Check that pulling from an empty pipe fails.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on empty pipe")
}
}
func TestTooLargeWrite(t *testing.T) {
// Check that writes that are too large are properly rejected.
b := make([]byte, 96)
var tx Tx
tx.Init(b)
if wb := tx.Push(96); wb != nil {
t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe")
}
if wb := tx.Push(88); wb != nil {
t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe")
}
if wb := tx.Push(80); wb == nil {
t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
}
}
func TestFullWrite(t *testing.T) {
// Check that writes fail when the pipe is full.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(80); wb == nil {
t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
}
if wb := tx.Push(1); wb != nil {
t.Fatalf("Write succeeded on full pipe")
}
}
func TestFullAndFlushedWrite(t *testing.T) {
// Check that writes fail when the pipe is full and has already been
// flushed.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(80); wb == nil {
t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
}
tx.Flush()
if wb := tx.Push(1); wb != nil {
t.Fatalf("Write succeeded on full pipe")
}
}
func TestTxFlushTwice(t *testing.T) {
// Checks that a second consecutive tx flush is a no-op.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
// Make copy of original tx queue, flush it, then check that it didn't
// change.
orig := tx
tx.Flush()
if !reflect.DeepEqual(orig, tx) {
t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig)
}
}
func TestRxFlushTwice(t *testing.T) {
// Checks that a second consecutive rx flush is a no-op.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// Make copy of original rx queue, flush it, then check that it didn't
// change.
orig := rx
rx.Flush()
if !reflect.DeepEqual(orig, rx) {
t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig)
}
}
func TestWrapInMiddleOfTransaction(t *testing.T) {
// Check that writes are not flushed when we need to wrap the buffer
// around.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// At this point the ring buffer is empty, but the write is at offset
// 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
if wb := tx.Push(10); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on non-full pipe")
}
// We haven't flushed yet, so pull must return nil.
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on non-flushed pipe")
}
tx.Flush()
// The two buffers must be available now.
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
}
func TestWriteAbort(t *testing.T) {
// Check that a read fails on a pipe that has had data pushed to it but
// has aborted the push.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(10); wb == nil {
t.Fatalf("Write failed on empty pipe")
}
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on empty pipe")
}
tx.Abort()
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on empty pipe")
}
}
func TestWrappedWriteAbort(t *testing.T) {
// Check that writes are properly aborted even if the writes wrap
// around.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// At this point the ring buffer is empty, but the write is at offset
// 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
if wb := tx.Push(10); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on non-full pipe")
}
// We haven't flushed yet, so pull must return nil.
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on non-flushed pipe")
}
tx.Abort()
// The pushes were aborted, so no data should be readable.
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on non-flushed pipe")
}
// Try the same transactions again, but flush this time.
if wb := tx.Push(10); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on non-full pipe")
}
tx.Flush()
// The two buffers must be available now.
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
}
func TestEmptyReadOnNonFlushedWrite(t *testing.T) {
// Check that a read fails on a pipe that has had data pushed to it
// but not yet flushed.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(10); wb == nil {
t.Fatalf("Write failed on empty pipe")
}
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on empty pipe")
}
tx.Flush()
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull on failed on non-empty pipe")
}
}
func TestPullAfterPullingEntirePipe(t *testing.T) {
// Check that Pull fails when the pipe is full, but all of it has
// already been pulled but not yet flushed.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// At this point the ring buffer is empty, but the write is at offset
// 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3
// buffers that will fill the pipe.
if wb := tx.Push(10); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
if wb := tx.Push(20); wb == nil {
t.Fatalf("Push failed on non-full pipe")
}
if wb := tx.Push(24); wb == nil {
t.Fatalf("Push failed on non-full pipe")
}
tx.Flush()
// The three buffers must be available now.
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
// Fourth pull must fail.
if rb := rx.Pull(); rb != nil {
t.Fatalf("Pull succeeded on empty pipe")
}
}
func TestNoRoomToWrapOnPush(t *testing.T) {
// Check that Push fails when it tries to allocate room to add a wrap
// message.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
var rx Rx
rx.Init(b)
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// At this point the ring buffer is empty, but the write is at offset
// 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20,
// which won't fit (64+20+8+padding = 96, which wouldn't leave room for
// the padding), so it wraps around.
if wb := tx.Push(20); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
// Buffer offset is at 28. Try to write 70, which would require a wrap
// slot which cannot be created now.
if wb := tx.Push(70); wb != nil {
t.Fatalf("Push succeeded on pipe with no room for wrap message")
}
}
func TestRxImplicitFlushOfWrapMessage(t *testing.T) {
// Check if the first read is that of a wrapping message, that it gets
// immediately flushed.
b := make([]byte, 100)
var tx Tx
tx.Init(b)
if wb := tx.Push(50); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
tx.Flush()
// This will cause a wrapping message to written.
if wb := tx.Push(60); wb != nil {
t.Fatalf("Push succeeded when there is no room in pipe")
}
var rx Rx
rx.Init(b)
// Read the first message.
if rb := rx.Pull(); rb == nil {
t.Fatalf("Pull failed on non-empty pipe")
}
rx.Flush()
// This should fail because of the wrapping message is taking up space.
if wb := tx.Push(60); wb != nil {
t.Fatalf("Push succeeded when there is no room in pipe")
}
// Try to read the next one. This should consume the wrapping message.
rx.Pull()
// This must now succeed.
if wb := tx.Push(60); wb == nil {
t.Fatalf("Push failed on empty pipe")
}
}
func TestConcurrentReaderWriter(t *testing.T) {
// Push a million buffers of random sizes and random contents. Check
// that buffers read match what was written.
tr := rand.New(rand.NewSource(99))
rr := rand.New(rand.NewSource(99))
b := make([]byte, 100)
var tx Tx
tx.Init(b)
var rx Rx
rx.Init(b)
const count = 1000000
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runtime.Gosched()
for i := 0; i < count; i++ {
n := 1 + tr.Intn(80)
wb := tx.Push(uint64(n))
for wb == nil {
wb = tx.Push(uint64(n))
}
for j := range wb {
wb[j] = byte(tr.Intn(256))
}
tx.Flush()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
runtime.Gosched()
for i := 0; i < count; i++ {
n := 1 + rr.Intn(80)
rb := rx.Pull()
for rb == nil {
rb = rx.Pull()
}
if n != len(rb) {
t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n)
}
for j := range rb {
if v := byte(rr.Intn(256)); v != rb[j] {
t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v)
}
}
rx.Flush()
}
}()
wg.Wait()
}