Single producer/consumer (for right now); back to pre-debugging performance levels.
diff --git a/main.go b/main.go
index 706d4f8..9be0721 100644
--- a/main.go
+++ b/main.go
@@ -18,15 +18,15 @@
// consumerBarrier := NewBarrier(consumerSequence1, consumerSequence2)
sequencer := NewSingleProducerSequencer(producerSequence, RingSize, consumerBarrier)
- go consume(1, producerBarrier, producerSequence, consumerSequence1)
+ go consume(producerBarrier, producerSequence, consumerSequence1)
// go consume(2, producerBarrier, producerSequence, consumerSequence2)
started := time.Now()
for i := int64(0); i < MaxSequenceValue; i++ {
ticket := sequencer.Next(1)
- //ringBuffer[ticket&RingMask] = ticket
+ ringBuffer[ticket&RingMask] = ticket
sequencer.Publish(ticket)
- if ticket%Mod == 0 {
+ if ticket%Mod == 0 && ticket > 0 {
finished := time.Now()
elapsed := finished.Sub(started)
fmt.Println(ticket, elapsed)
@@ -35,8 +35,8 @@
}
}
-func consume(name int, barrier Barrier, source, sequence *Sequence) {
- worker := NewWorker(barrier, TestHandler{name}, source, sequence)
+func consume(barrier Barrier, source, sequence *Sequence) {
+ worker := NewWorker(barrier, TestHandler{}, source, sequence)
for {
worker.Process()
@@ -44,19 +44,19 @@
}
const Mod = 1000000 * 100 // 1 million * 100
-const RingSize = 1024 * 256
+const RingSize = 1024 * 16
const RingMask = RingSize - 1
var ringBuffer [RingSize]int64
-type TestHandler struct{ name int }
+type TestHandler struct{}
func (this TestHandler) Consume(sequence, remaining int64) {
- // message := ringBuffer[sequence&RingMask]
- // if message != sequence {
- // //fmt.Printf("\t\t\t\t\t\t\t\t\tERROR Consumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
- // //panic(fmt.Sprintf("Consumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message))
- // } else if sequence%Mod == 0 {
- // //fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
- // }
+ message := ringBuffer[sequence&RingMask]
+ if message != sequence {
+ //fmt.Printf("\t\t\t\t\t\t\t\t\tERROR Consumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
+ panic(fmt.Sprintf("Consumer:: Sequence: %d, Message: %d\n", sequence, message))
+ } else if sequence%Mod == 0 {
+ //fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Sequence: %d, Message: %d\n", this.name, sequence, message)
+ }
}