Completed simple wireup (without wait strategies) for single-writer Disruptor.
diff --git a/disruptor.go b/disruptor.go
index d659f4d..3816af0 100644
--- a/disruptor.go
+++ b/disruptor.go
@@ -5,10 +5,6 @@
readers []*Reader
}
-func NewDisruptor(builder Builder) Disruptor {
- return Disruptor{}
-}
-
func (this Disruptor) Writer() *Writer {
return this.writer
}
diff --git a/shared_disruptor.go b/shared_disruptor.go
deleted file mode 100644
index fc3b501..0000000
--- a/shared_disruptor.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package disruptor
-
-type SharedDisruptor struct {
- writer *Writer
- readers []*Reader
-}
-
-func NewSharedDisruptor(builder Builder) SharedDisruptor {
- return SharedDisruptor{}
-}
-
-func (this SharedDisruptor) Writer() *Writer {
- return this.writer
-}
-
-func (this SharedDisruptor) Start() {
- for _, item := range this.readers {
- item.Start()
- }
-}
-
-func (this SharedDisruptor) Stop() {
- for _, item := range this.readers {
- item.Stop()
- }
-}
diff --git a/wireup.go b/wireup.go
index 71dfc96..57f3f81 100644
--- a/wireup.go
+++ b/wireup.go
@@ -37,9 +37,40 @@
}
func (this Wireup) Build() Disruptor {
- return NewDisruptor(this)
-}
+ allReaders := []*Reader{}
+ written := this.cursors[0]
+ barriers := []Barrier{written}
+ cursorIndex := 1 // 0 index is reserved for the writer Cursor
-func (this Wireup) BuildShared() SharedDisruptor {
- return NewSharedDisruptor(this)
+ for groupIndex, group := range this.groups {
+ upstream := barriers[groupIndex]
+ groupReaders, barrier := this.buildReaders(groupIndex, cursorIndex, written, upstream)
+ for _, item := range groupReaders {
+ allReaders = append(allReaders, item)
+ }
+ barriers = append(barriers, barrier)
+ cursorIndex += len(group)
+ }
+
+ writerBarrier := barriers[len(barriers)-1]
+ writer := NewWriter(written, writerBarrier, this.capacity)
+ return Disruptor{writer: writer, readers: allReaders}
+}
+func (this Wireup) buildReaders(consumerIndex, cursorIndex int, written *Cursor, upstream Barrier) ([]*Reader, Barrier) {
+ barrierCursors := []*Cursor{}
+ readers := []*Reader{}
+
+ for _, consumer := range this.groups[consumerIndex] {
+ cursor := this.cursors[cursorIndex]
+ barrierCursors = append(barrierCursors, cursor)
+ reader := NewReader(cursor, written, upstream, consumer)
+ readers = append(readers, reader)
+ cursorIndex++
+ }
+
+ if len(this.groups[consumerIndex]) == 1 {
+ return readers, barrierCursors[0]
+ } else {
+ return readers, NewCompositeBarrier(barrierCursors...)
+ }
}