Working multiple consumers--boneheaded issue during construction of new barrier where it used the same sequence value multiple times.
diff --git a/barrier.go b/barrier.go
index d36e3d0..adc049d 100644
--- a/barrier.go
+++ b/barrier.go
@@ -1,10 +1,17 @@
package main
-func (this Barrier) Load() int64 {
+func (this Barrier) Load(consumer int) int64 {
minimum := MaxSequenceValue
for i := 0; i < len(this); i++ {
cursor := this[i].Load()
+
+ // if len(this) > 1 {
+ // fmt.Printf("Producer:: Consumer %d Barrier: %d\n", i+1, cursor)
+ // } else {
+ // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Producer Barrier: %d\n", consumer, cursor)
+ // }
+
if cursor < minimum {
minimum = cursor
}
@@ -16,7 +23,7 @@
func NewBarrier(upstream ...*Sequence) Barrier {
this := Barrier{}
for i := 0; i < len(upstream); i++ {
- this = append(this, upstream[0])
+ this = append(this, upstream[i])
}
return this
}
diff --git a/main.go b/main.go
index be0d566..56fa80a 100644
--- a/main.go
+++ b/main.go
@@ -3,7 +3,6 @@
import (
"fmt"
"runtime"
- "time"
)
func main() {
@@ -18,33 +17,40 @@
consumerBarrier := NewBarrier(consumerSequence1, consumerSequence2)
sequencer := NewSingleProducerSequencer(producerSequence, RingSize, consumerBarrier)
- go consume(producerBarrier, producerSequence, consumerSequence1)
- go consume(producerBarrier, producerSequence, consumerSequence2)
+ go consume(1, producerBarrier, producerSequence, consumerSequence1)
+ go consume(2, producerBarrier, producerSequence, consumerSequence2)
+ // time.Sleep(time.Millisecond * 10)
- started := time.Now()
+ // started := time.Now()
for i := int64(0); i < MaxSequenceValue; i++ {
+ // fmt.Printf("Producer:: Attempting to claim next sequence.\n")
ticket := sequencer.Next(1)
ringBuffer[ticket&RingMask] = ticket
+ //time.Sleep(time.Nanosecond * 500)
+ //runtime.Gosched()
+ // fmt.Printf("Producer:: Claimed sequence: %d, Publishing...\n", ticket)
sequencer.Publish(ticket)
- if ticket%Mod == 0 {
- finished := time.Now()
- elapsed := finished.Sub(started)
- fmt.Println(ticket, elapsed)
- started = time.Now()
- }
+ // fmt.Printf("Producer:: Claimed sequence: %d, Published\n", ticket)
+ // if ticket%Mod == 0 {
+ // finished := time.Now()
+ // elapsed := finished.Sub(started)
+ // fmt.Println(ticket, elapsed)
+ // started = time.Now()
+ // }
}
}
-func consume(barrier Barrier, source, sequence *Sequence) {
- worker := NewWorker(barrier, TestHandler{}, source, sequence)
+func consume(name int, barrier Barrier, source, sequence *Sequence) {
+ worker := NewWorker(barrier, TestHandler{name}, source, sequence)
for {
- worker.Process()
+ // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Attempting to process messages.\n", name)
+ worker.Process(name)
}
}
-const Mod = 1000000 * 10 // 1 million * 10
-const RingSize = 1024
+const Mod = 1000000 * 100 // 1 million * 10
+const RingSize = 1024 * 256
const RingMask = RingSize - 1
// RingMask = RingSize - 1 // slightly faster than a mod operation
@@ -56,13 +62,15 @@
var ringBuffer [RingSize]int64
-type TestHandler struct{}
+type TestHandler struct{ name int }
func (this TestHandler) Consume(sequence, remaining int64) {
message := ringBuffer[sequence&RingMask]
+ //fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
if message != sequence {
- panic(fmt.Sprintf("Sequence: %d, Message: %d", sequence, message))
+ fmt.Printf("\t\t\t\t\t\t\t\t\tERROR Consumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
+ panic(fmt.Sprintf("Consumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message))
} else if sequence%Mod == 0 {
- // fmt.Println("Current Sequence:", sequence)
+ fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
}
}
diff --git a/sequence.go b/sequence.go
index 2a789c9..06e5e45 100644
--- a/sequence.go
+++ b/sequence.go
@@ -1,9 +1,12 @@
package main
+import "sync/atomic"
+
type Sequence [FillCPUCacheLine]int64
func (this *Sequence) Store(value int64) {
- (*this)[0] = value
+ atomic.StoreInt64(&(*this)[0], value)
+ // (*this)[0] = value
}
func (this *Sequence) Load() int64 {
return (*this)[0]
diff --git a/single_sequencer.go b/single_sequencer.go
index 541beb4..483c8a0 100644
--- a/single_sequencer.go
+++ b/single_sequencer.go
@@ -1,19 +1,31 @@
package main
-func (this *SingleProducerSequencer) Next(slots int64) int64 {
- current, cachedGate := this.current, this.cachedGate
- next := current + slots
+func (this *SingleProducerSequencer) Next(items int64) int64 {
+ claimed, gate := this.claimed, this.gate
+ next := claimed + items
wrap := next - this.ringSize
+ // fmt.Printf("Producer:: Last claim: %d, Next: %d, Wrap: %d, Gate:%d\n", claimed, next, wrap, gate)
- if wrap > cachedGate /*|| cachedGate > current*/ {
- min, last := int64(0), this.last
- for wrap > min {
- min = last.Load()
+ if wrap > gate {
+ last := this.last
+ min := last.Load(1)
+ // fmt.Printf("Producer:: (a) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
+
+ for wrap > min || min < 0 {
+ min = last.Load(1)
+ // fmt.Printf("Producer:: (b) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
+
+ // if wrap <= min {
+ // fmt.Printf("Producer:: Consumers have caught up to producer.\n")
+ // }
+
}
- this.cachedGate = min
+
+ // fmt.Printf("Producer:: (c) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
+ this.gate = min
}
- this.current = next
+ this.claimed = next
return next
}
@@ -23,18 +35,18 @@
func NewSingleProducerSequencer(cursor *Sequence, ringSize int32, last Barrier) *SingleProducerSequencer {
return &SingleProducerSequencer{
- current: InitialSequenceValue,
- cachedGate: InitialSequenceValue,
- cursor: cursor,
- ringSize: int64(ringSize),
- last: last,
+ claimed: InitialSequenceValue,
+ gate: InitialSequenceValue,
+ cursor: cursor,
+ ringSize: int64(ringSize),
+ last: last,
}
}
type SingleProducerSequencer struct {
- current int64
- cachedGate int64
- cursor *Sequence
- ringSize int64
- last Barrier
+ claimed int64
+ gate int64
+ cursor *Sequence
+ ringSize int64
+ last Barrier
}
diff --git a/worker.go b/worker.go
index 19cc782..73c0336 100644
--- a/worker.go
+++ b/worker.go
@@ -1,15 +1,20 @@
package main
-func (this Worker) Process() uint8 {
+func (this Worker) Process(consumer int) uint8 {
next := this.sequence.Load() + 1
- available := this.barrier.Load()
+ available := this.barrier.Load(consumer)
if next <= available {
+ // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: %d work items found.\n", consumer, available-next+1)
+
for next <= available {
+ // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Consuming sequence %d\n", consumer, next)
+
this.handler.Consume(next, available-next)
next++
}
+ // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Completed through sequence %d\n", consumer, next-1)
this.sequence.Store(next - 1)
return Processing
} else if next <= this.source.Load() {
@@ -20,6 +25,7 @@
}
func NewWorker(barrier Barrier, handler Consumer, source, sequence *Sequence) Worker {
+ // TODO: make this a pointer and test performance...
return Worker{
barrier: barrier,
handler: handler,