Added wireup for SharedDisruptor; updated benchmarks; updated README with benchmark numbers.
diff --git a/README.md b/README.md
index 1dfce31..9a688da 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@
Disruptor: Writer, Reserve Many | 1.1 ns/op
Disruptor: Writer, Await One | 3.5 ns/op
Disruptor: Writer, Await Many | 1.0 ns/op
-Disruptor: SharedWriter, Reserve One | 14.5 ns/op
+Disruptor: SharedWriter, Reserve One | 13.6 ns/op
Disruptor: SharedWriter, Reserve Many | 2.5 ns/op
Disruptor: SharedWriter, Reserve One, Contended Write | nn.n ns/op
Disruptor: SharedWriter, Reserve Many, Contended Write | nn.n ns/op
diff --git a/benchmark-disruptor/shared_writer_test.go b/benchmark-disruptor/shared_writer_test.go
index fcd7122..2232478 100644
--- a/benchmark-disruptor/shared_writer_test.go
+++ b/benchmark-disruptor/shared_writer_test.go
@@ -8,35 +8,41 @@
func BenchmarkSharedWriterReserveOne(b *testing.B) {
ringBuffer := [RingBufferSize]int64{}
- written, read := disruptor.NewCursor(), disruptor.NewCursor()
- shared := disruptor.NewSharedWriterBarrier(written, RingBufferSize)
- reader := disruptor.NewReader(read, written, shared, SampleConsumer{&ringBuffer})
- writer := disruptor.NewSharedWriter(shared, read)
- reader.Start()
+ controller := disruptor.
+ Configure(RingBufferSize).
+ WithConsumerGroup(SampleConsumer{&ringBuffer}).
+ BuildShared()
+ controller.Start()
+ defer controller.Stop()
+ writer := controller.Writer()
iterations := int64(b.N)
+ sequence := disruptor.InitialSequenceValue
+
b.ReportAllocs()
b.ResetTimer()
- sequence := disruptor.InitialSequenceValue
for sequence < iterations {
sequence = writer.Reserve(ReserveOne)
ringBuffer[sequence&RingBufferMask] = sequence
writer.Commit(sequence, sequence)
}
- reader.Stop()
+ b.StopTimer()
}
func BenchmarkSharedWriterReserveMany(b *testing.B) {
ringBuffer := [RingBufferSize]int64{}
- written, read := disruptor.NewCursor(), disruptor.NewCursor()
- shared := disruptor.NewSharedWriterBarrier(written, RingBufferSize)
- reader := disruptor.NewReader(read, written, shared, SampleConsumer{&ringBuffer})
- writer := disruptor.NewSharedWriter(shared, read)
- reader.Start()
+ controller := disruptor.
+ Configure(RingBufferSize).
+ WithConsumerGroup(SampleConsumer{&ringBuffer}).
+ BuildShared()
+ controller.Start()
+ defer controller.Stop()
+ writer := controller.Writer()
iterations := int64(b.N)
+
b.ReportAllocs()
b.ResetTimer()
@@ -52,5 +58,5 @@
previous = current
}
- reader.Stop()
+ b.StopTimer()
}
diff --git a/shared_disruptor.go b/shared_disruptor.go
new file mode 100644
index 0000000..7d7b63b
--- /dev/null
+++ b/shared_disruptor.go
@@ -0,0 +1,22 @@
+package disruptor
+
+type SharedDisruptor struct {
+ writer *SharedWriter
+ readers []*Reader
+}
+
+func (this SharedDisruptor) Writer() *SharedWriter {
+ return this.writer
+}
+
+func (this SharedDisruptor) Start() {
+ for _, item := range this.readers {
+ item.Start()
+ }
+}
+
+func (this SharedDisruptor) Stop() {
+ for _, item := range this.readers {
+ item.Stop()
+ }
+}
diff --git a/wireup.go b/wireup.go
index b3a6f76..4c31308 100644
--- a/wireup.go
+++ b/wireup.go
@@ -52,6 +52,27 @@
writer := NewWriter(written, upstream, this.capacity)
return Disruptor{writer: writer, readers: allReaders}
}
+
+func (this Wireup) BuildShared() SharedDisruptor {
+ allReaders := []*Reader{}
+ written := this.cursors[0]
+ writerBarrier := NewSharedWriterBarrier(written, this.capacity)
+ var upstream Barrier = writerBarrier
+ cursorIndex := 1 // 0 index is reserved for the writer Cursor
+
+ for groupIndex, group := range this.groups {
+ groupReaders, groupBarrier := this.buildReaders(groupIndex, cursorIndex, written, upstream)
+ for _, item := range groupReaders {
+ allReaders = append(allReaders, item)
+ }
+ upstream = groupBarrier
+ cursorIndex += len(group)
+ }
+
+ writer := NewSharedWriter(writerBarrier, upstream)
+ return SharedDisruptor{writer: writer, readers: allReaders}
+}
+
func (this Wireup) buildReaders(consumerIndex, cursorIndex int, written *Cursor, upstream Barrier) ([]*Reader, Barrier) {
barrierCursors := []*Cursor{}
readers := []*Reader{}