blob: 7a38a57d93010b766de21057b5662d75de91a7b8 [file] [log] [blame]
package disruptor
// Cursors should be a party of the same backing array to keep them as close together as possible:
// https://news.ycombinator.com/item?id=7800825
type (
Wireup struct {
capacity int64
groups [][]Consumer
cursors []*Cursor // backing array keeps cursors (with padding) in contiguous memory
}
)
func Configure(capacity int64) Wireup {
return Wireup{
capacity: capacity,
groups: [][]Consumer{},
cursors: []*Cursor{NewCursor()},
}
}
func (this Wireup) WithConsumerGroup(consumers ...Consumer) Wireup {
if len(consumers) == 0 {
return this
}
target := make([]Consumer, len(consumers))
copy(target, consumers)
for i := 0; i < len(consumers); i++ {
this.cursors = append(this.cursors, NewCursor())
}
this.groups = append(this.groups, target)
return this
}
func (this Wireup) Build() Disruptor {
allReaders := []*Reader{}
written := this.cursors[0]
barriers := []Barrier{written}
cursorIndex := 1 // 0 index is reserved for the writer Cursor
for groupIndex, group := range this.groups {
upstream := barriers[groupIndex]
groupReaders, barrier := this.buildReaders(groupIndex, cursorIndex, written, upstream)
for _, item := range groupReaders {
allReaders = append(allReaders, item)
}
barriers = append(barriers, barrier)
cursorIndex += len(group)
}
writerBarrier := barriers[len(barriers)-1]
writer := NewWriter(written, writerBarrier, this.capacity)
return Disruptor{writer: writer, readers: allReaders}
}
func (this Wireup) buildReaders(consumerIndex, cursorIndex int, written *Cursor, upstream Barrier) ([]*Reader, Barrier) {
barrierCursors := []*Cursor{}
readers := []*Reader{}
for _, consumer := range this.groups[consumerIndex] {
cursor := this.cursors[cursorIndex]
barrierCursors = append(barrierCursors, cursor)
reader := NewReader(cursor, written, upstream, consumer)
readers = append(readers, reader)
cursorIndex++
}
if len(this.groups[consumerIndex]) == 1 {
return readers, barrierCursors[0]
} else {
return readers, NewCompositeBarrier(barrierCursors...)
}
}