| // Copyright 2018 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pipe |
| |
| import ( |
| "math/rand" |
| "reflect" |
| "runtime" |
| "sync" |
| "testing" |
| ) |
| |
| func TestSimpleReadWrite(t *testing.T) { |
| // Check that a simple write can be properly read from the rx side. |
| tr := rand.New(rand.NewSource(99)) |
| rr := rand.New(rand.NewSource(99)) |
| |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| wb := tx.Push(10) |
| if wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| for i := range wb { |
| wb[i] = byte(tr.Intn(256)) |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| rb := rx.Pull() |
| if len(rb) != 10 { |
| t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10) |
| } |
| |
| for i := range rb { |
| if v := byte(rr.Intn(256)); v != rb[i] { |
| t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v) |
| } |
| } |
| rx.Flush() |
| } |
| |
| func TestEmptyRead(t *testing.T) { |
| // Check that pulling from an empty pipe fails. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on empty pipe") |
| } |
| } |
| |
| func TestTooLargeWrite(t *testing.T) { |
| // Check that writes that are too large are properly rejected. |
| b := make([]byte, 96) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(96); wb != nil { |
| t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe") |
| } |
| |
| if wb := tx.Push(88); wb != nil { |
| t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe") |
| } |
| |
| if wb := tx.Push(80); wb == nil { |
| t.Fatalf("Write of 80 bytes failed on 96-byte pipe") |
| } |
| } |
| |
| func TestFullWrite(t *testing.T) { |
| // Check that writes fail when the pipe is full. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(80); wb == nil { |
| t.Fatalf("Write of 80 bytes failed on 96-byte pipe") |
| } |
| |
| if wb := tx.Push(1); wb != nil { |
| t.Fatalf("Write succeeded on full pipe") |
| } |
| } |
| |
| func TestFullAndFlushedWrite(t *testing.T) { |
| // Check that writes fail when the pipe is full and has already been |
| // flushed. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(80); wb == nil { |
| t.Fatalf("Write of 80 bytes failed on 96-byte pipe") |
| } |
| |
| tx.Flush() |
| |
| if wb := tx.Push(1); wb != nil { |
| t.Fatalf("Write succeeded on full pipe") |
| } |
| } |
| |
| func TestTxFlushTwice(t *testing.T) { |
| // Checks that a second consecutive tx flush is a no-op. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| // Make copy of original tx queue, flush it, then check that it didn't |
| // change. |
| orig := tx |
| tx.Flush() |
| |
| if !reflect.DeepEqual(orig, tx) { |
| t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig) |
| } |
| } |
| |
| func TestRxFlushTwice(t *testing.T) { |
| // Checks that a second consecutive rx flush is a no-op. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // Make copy of original rx queue, flush it, then check that it didn't |
| // change. |
| orig := rx |
| rx.Flush() |
| |
| if !reflect.DeepEqual(orig, rx) { |
| t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig) |
| } |
| } |
| |
| func TestWrapInMiddleOfTransaction(t *testing.T) { |
| // Check that writes are not flushed when we need to wrap the buffer |
| // around. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // At this point the ring buffer is empty, but the write is at offset |
| // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on non-full pipe") |
| } |
| |
| // We haven't flushed yet, so pull must return nil. |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on non-flushed pipe") |
| } |
| |
| tx.Flush() |
| |
| // The two buffers must be available now. |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| } |
| |
| func TestWriteAbort(t *testing.T) { |
| // Check that a read fails on a pipe that has had data pushed to it but |
| // has aborted the push. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Write failed on empty pipe") |
| } |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on empty pipe") |
| } |
| |
| tx.Abort() |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on empty pipe") |
| } |
| } |
| |
| func TestWrappedWriteAbort(t *testing.T) { |
| // Check that writes are properly aborted even if the writes wrap |
| // around. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // At this point the ring buffer is empty, but the write is at offset |
| // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on non-full pipe") |
| } |
| |
| // We haven't flushed yet, so pull must return nil. |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on non-flushed pipe") |
| } |
| |
| tx.Abort() |
| |
| // The pushes were aborted, so no data should be readable. |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on non-flushed pipe") |
| } |
| |
| // Try the same transactions again, but flush this time. |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on non-full pipe") |
| } |
| |
| tx.Flush() |
| |
| // The two buffers must be available now. |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| } |
| |
| func TestEmptyReadOnNonFlushedWrite(t *testing.T) { |
| // Check that a read fails on a pipe that has had data pushed to it |
| // but not yet flushed. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Write failed on empty pipe") |
| } |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on empty pipe") |
| } |
| |
| tx.Flush() |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull on failed on non-empty pipe") |
| } |
| } |
| |
| func TestPullAfterPullingEntirePipe(t *testing.T) { |
| // Check that Pull fails when the pipe is full, but all of it has |
| // already been pulled but not yet flushed. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // At this point the ring buffer is empty, but the write is at offset |
| // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3 |
| // buffers that will fill the pipe. |
| if wb := tx.Push(10); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| |
| if wb := tx.Push(20); wb == nil { |
| t.Fatalf("Push failed on non-full pipe") |
| } |
| |
| if wb := tx.Push(24); wb == nil { |
| t.Fatalf("Push failed on non-full pipe") |
| } |
| |
| tx.Flush() |
| |
| // The three buffers must be available now. |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| |
| // Fourth pull must fail. |
| if rb := rx.Pull(); rb != nil { |
| t.Fatalf("Pull succeeded on empty pipe") |
| } |
| } |
| |
| func TestNoRoomToWrapOnPush(t *testing.T) { |
| // Check that Push fails when it tries to allocate room to add a wrap |
| // message. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| var rx Rx |
| rx.Init(b) |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // At this point the ring buffer is empty, but the write is at offset |
| // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20, |
| // which won't fit (64+20+8+padding = 96, which wouldn't leave room for |
| // the padding), so it wraps around. |
| if wb := tx.Push(20); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| |
| tx.Flush() |
| |
| // Buffer offset is at 28. Try to write 70, which would require a wrap |
| // slot which cannot be created now. |
| if wb := tx.Push(70); wb != nil { |
| t.Fatalf("Push succeeded on pipe with no room for wrap message") |
| } |
| } |
| |
| func TestRxImplicitFlushOfWrapMessage(t *testing.T) { |
| // Check if the first read is that of a wrapping message, that it gets |
| // immediately flushed. |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| if wb := tx.Push(50); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| tx.Flush() |
| |
| // This will cause a wrapping message to written. |
| if wb := tx.Push(60); wb != nil { |
| t.Fatalf("Push succeeded when there is no room in pipe") |
| } |
| |
| var rx Rx |
| rx.Init(b) |
| |
| // Read the first message. |
| if rb := rx.Pull(); rb == nil { |
| t.Fatalf("Pull failed on non-empty pipe") |
| } |
| rx.Flush() |
| |
| // This should fail because of the wrapping message is taking up space. |
| if wb := tx.Push(60); wb != nil { |
| t.Fatalf("Push succeeded when there is no room in pipe") |
| } |
| |
| // Try to read the next one. This should consume the wrapping message. |
| rx.Pull() |
| |
| // This must now succeed. |
| if wb := tx.Push(60); wb == nil { |
| t.Fatalf("Push failed on empty pipe") |
| } |
| } |
| |
| func TestConcurrentReaderWriter(t *testing.T) { |
| // Push a million buffers of random sizes and random contents. Check |
| // that buffers read match what was written. |
| tr := rand.New(rand.NewSource(99)) |
| rr := rand.New(rand.NewSource(99)) |
| |
| b := make([]byte, 100) |
| var tx Tx |
| tx.Init(b) |
| |
| var rx Rx |
| rx.Init(b) |
| |
| const count = 1000000 |
| var wg sync.WaitGroup |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| runtime.Gosched() |
| for i := 0; i < count; i++ { |
| n := 1 + tr.Intn(80) |
| wb := tx.Push(uint64(n)) |
| for wb == nil { |
| wb = tx.Push(uint64(n)) |
| } |
| |
| for j := range wb { |
| wb[j] = byte(tr.Intn(256)) |
| } |
| |
| tx.Flush() |
| } |
| }() |
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| runtime.Gosched() |
| for i := 0; i < count; i++ { |
| n := 1 + rr.Intn(80) |
| rb := rx.Pull() |
| for rb == nil { |
| rb = rx.Pull() |
| } |
| |
| if n != len(rb) { |
| t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n) |
| } |
| |
| for j := range rb { |
| if v := byte(rr.Intn(256)); v != rb[j] { |
| t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v) |
| } |
| } |
| |
| rx.Flush() |
| } |
| }() |
| |
| wg.Wait() |
| } |