Added additional tests; removed obsolete implementation of the CompositeBarrier.
diff --git a/benchmark-disruptor/writer_reservation_multiple_readers_test.go b/benchmark-disruptor/writer_reservation_multiple_readers_test.go
new file mode 100644
index 0000000..2bd8775
--- /dev/null
+++ b/benchmark-disruptor/writer_reservation_multiple_readers_test.go
@@ -0,0 +1,62 @@
+package benchmarks
+
+import (
+ "testing"
+
+ "github.com/smartystreets/go-disruptor"
+)
+
+func BenchmarkWriterReserveOneMultipleReaders(b *testing.B) {
+ ringBuffer := [RingBufferSize]int64{}
+ written, read1, read2 := disruptor.NewCursor(), disruptor.NewCursor(), disruptor.NewCursor()
+ reader1 := disruptor.NewReader(read1, written, written, SampleConsumer{&ringBuffer})
+ reader2 := disruptor.NewReader(read2, written, written, SampleConsumer{&ringBuffer})
+ barrier := disruptor.NewCompositeBarrier(read1, read2)
+ writer := disruptor.NewWriter(written, barrier, RingBufferSize)
+
+ reader1.Start()
+ reader2.Start()
+
+ iterations := int64(b.N)
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ sequence := disruptor.InitialSequenceValue
+ for sequence < iterations {
+ sequence = writer.Reserve(ReserveOne)
+ ringBuffer[sequence&RingBufferMask] = sequence
+ writer.Commit(sequence, sequence)
+ }
+
+ reader1.Stop()
+ reader2.Stop()
+}
+func BenchmarkWriterReserveManyMultipleReaders(b *testing.B) {
+ ringBuffer := [RingBufferSize]int64{}
+ written, read1, read2 := disruptor.NewCursor(), disruptor.NewCursor(), disruptor.NewCursor()
+ reader1 := disruptor.NewReader(read1, written, written, SampleConsumer{&ringBuffer})
+ reader2 := disruptor.NewReader(read2, written, written, SampleConsumer{&ringBuffer})
+ barrier := disruptor.NewCompositeBarrier(read1, read2)
+ writer := disruptor.NewWriter(written, barrier, RingBufferSize)
+
+ reader1.Start()
+ reader2.Start()
+
+ iterations := int64(b.N)
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ sequence := disruptor.InitialSequenceValue
+ for sequence < iterations {
+ sequence = writer.Reserve(ReserveMany)
+
+ for i := sequence - ReserveManyDelta; i <= sequence; i++ {
+ ringBuffer[i&RingBufferMask] = i
+ }
+
+ writer.Commit(sequence, sequence)
+ }
+
+ reader1.Stop()
+ reader2.Stop()
+}
diff --git a/composite_barrier.go b/composite_barrier.go
index 80c12bd..ace1d19 100644
--- a/composite_barrier.go
+++ b/composite_barrier.go
@@ -2,10 +2,6 @@
type CompositeBarrier []*Cursor
-// type CompositeBarrier struct {
-// cursors []*Cursor
-// }
-
func NewCompositeBarrier(upstream ...*Cursor) CompositeBarrier {
if len(upstream) == 0 {
panic("At least one upstream cursor is required.")
@@ -13,13 +9,11 @@
cursors := make([]*Cursor, len(upstream))
copy(cursors, upstream)
- // return &CompositeBarrier{cursors}
return CompositeBarrier(cursors)
}
func (this CompositeBarrier) Read(noop int64) int64 {
minimum := MaxSequenceValue
- // for _, item := range this.cursors {
for _, item := range this {
sequence := item.Load()
if sequence < minimum {