Race condition corrected.
diff --git a/barrier.go b/barrier.go
index 968f2c6..9f1a480 100644
--- a/barrier.go
+++ b/barrier.go
@@ -1,7 +1,7 @@
package disruptor
type Barrier interface {
- Load() int64
+ LoadBarrier(int64) int64
}
func NewBarrier(upstream ...*Cursor) Barrier {
diff --git a/composite_barrier.go b/composite_barrier.go
index ef863f6..dbdd543 100644
--- a/composite_barrier.go
+++ b/composite_barrier.go
@@ -10,9 +10,8 @@
return &CompositeBarrier{cursors}
}
-func (this *CompositeBarrier) Load() int64 {
+func (this *CompositeBarrier) LoadBarrier(current int64) int64 {
minimum := MaxSequenceValue
-
for _, item := range this.cursors {
sequence := item.Load()
if sequence < minimum {
diff --git a/cursor_386.go b/cursor_386.go
index 60db919..bd75fcf 100644
--- a/cursor_386.go
+++ b/cursor_386.go
@@ -8,3 +8,6 @@
func (this *Cursor) Load() int64 {
return atomic.LoadInt64(&this.value)
}
+func (this *Cursor) LoadBarrier(current int64) int64 {
+ return atomic.LoadInt64(&this.value)
+}
diff --git a/cursor_amd64.go b/cursor_amd64.go
index 0a40b3d..bc47f36 100644
--- a/cursor_amd64.go
+++ b/cursor_amd64.go
@@ -6,3 +6,6 @@
func (this *Cursor) Load() int64 {
return this.value
}
+func (this *Cursor) LoadBarrier(current int64) int64 {
+ return this.value
+}
diff --git a/cursor_arm.go b/cursor_arm.go
index 60db919..bd75fcf 100644
--- a/cursor_arm.go
+++ b/cursor_arm.go
@@ -8,3 +8,6 @@
func (this *Cursor) Load() int64 {
return atomic.LoadInt64(&this.value)
}
+func (this *Cursor) LoadBarrier(current int64) int64 {
+ return atomic.LoadInt64(&this.value)
+}
diff --git a/example/example_consumer.go b/example/example_consumer.go
index 4c9b55f..0579616 100644
--- a/example/example_consumer.go
+++ b/example/example_consumer.go
@@ -51,7 +51,7 @@
// } else if remaining == disruptor.Idling {
// fmt.Println("\t\t\t\t\t[CONSUMER] Consumer idling at sequence", sequence)
// }
- // //time.Sleep(time.Millisecond * 10)
+ // time.Sleep(time.Millisecond * 10)
}
}
}
diff --git a/example/example_producer.go b/example/example_producer.go
index 3607547..472bbab 100644
--- a/example/example_producer.go
+++ b/example/example_producer.go
@@ -1,20 +1,27 @@
package main
-import "github.com/smartystreets/go-disruptor"
+import (
+ "fmt"
+ "time"
+
+ "github.com/smartystreets/go-disruptor"
+)
func publish(id int, writer *disruptor.SharedWriter) {
- // fmt.Printf("[PRODUCER %d] Starting producer...\n", id)
+ fmt.Printf("[PRODUCER %d] Starting producer...\n", id)
for {
+ // TODO: return lower, upper instead? or some kind of struct "Reservation"
+ // upon which commit can be invoked?
if sequence := writer.Reserve(id, ItemsToPublish); sequence != disruptor.Gating {
// fmt.Printf("[PRODUCER %d] Writing %d to slot %d\n", id, sequence, sequence)
ringBuffer[sequence&RingMask] = sequence
- // fmt.Printf("[PRODUCER %d] Committing sequence %d\n", id, sequence)
- writer.Commit(sequence)
- // } else {
- // // fmt.Println("[PRODUCER] Gating")
- // //time.Sleep(time.Millisecond)
+ // fmt.Printf("[PRODUCER %d] Committing from sequence %d\n", id, sequence)
+ writer.Commit(sequence, sequence+ItemsToPublish-1)
+ } else {
+ // // fmt.Printf("[PRODUCER %d] Gating\n", id)
+ time.Sleep(time.Millisecond * 10)
}
}
}
diff --git a/example/main.go b/example/main.go
index 7af8dea..bcbfb0a 100644
--- a/example/main.go
+++ b/example/main.go
@@ -11,8 +11,8 @@
MaxConsumerGroups = 1
MaxProducers = 2
ItemsToPublish = 1
- ReportingFrequency = 10000 //1000000 * 10 // 1 million * N
- RingSize = 2
+ ReportingFrequency = 1000000 * 10 // 1 million * N
+ RingSize = 1024 * 16
RingMask = RingSize - 1
)
diff --git a/reader.go b/reader.go
index 006bfa4..1ea5ec6 100644
--- a/reader.go
+++ b/reader.go
@@ -23,7 +23,7 @@
// instead of reading the cursor...
func (this *Reader) Receive() (int64, int64) {
next := this.readerCursor.Load() + 1
- ready := this.upstreamBarrier.Load()
+ ready := this.upstreamBarrier.LoadBarrier(next)
// fmt.Printf("\t\t\t\t\t[READER] Next: %d, Ready: %d\n", next, ready)
if next <= ready {
diff --git a/shared_writer.go b/shared_writer.go
index 0ffb9dc..0f909e6 100644
--- a/shared_writer.go
+++ b/shared_writer.go
@@ -34,7 +34,7 @@
if wrap > this.gate {
// fmt.Printf("[WRITER %d] Previous gate: %d\n", id, this.gate)
- min := this.upstream.Load()
+ min := this.upstream.LoadBarrier(0)
if wrap > min {
// fmt.Printf("[WRITER %d] New gate (waiting for consumers): %d\n", id, min)
return Gating
@@ -54,6 +54,10 @@
}
}
-func (this *SharedWriter) Commit(sequence int64) {
- this.committed[sequence&this.mask] = int32(sequence >> this.shift)
+func (this *SharedWriter) Commit(lower, upper int64) {
+ // fmt.Printf("[PRODUCER] Committing Reservation. Lower: %d, Upper: %d\n", lower, upper)
+
+ for shift, mask := this.shift, this.mask; lower <= upper; lower++ {
+ this.committed[lower&mask] = int32(lower >> shift)
+ }
}
diff --git a/shared_writer_barrier.go b/shared_writer_barrier.go
index d77910a..fb9c280 100644
--- a/shared_writer_barrier.go
+++ b/shared_writer_barrier.go
@@ -29,15 +29,19 @@
return buffer
}
-func (this *SharedWriterBarrier) Load() int64 {
-
- for sequence := this.reservation.Load(); sequence >= 0; sequence-- {
- if this.committed[sequence&this.mask] == int32(sequence>>this.shift) {
- // fmt.Printf("\t\t\t\t\t[SHARED-WRITER-BARRIER] Barrier Sequence: %d\n", sequence)
- return sequence
+func (this *SharedWriterBarrier) LoadBarrier(lower int64) int64 {
+ shift, mask := this.shift, this.mask
+ upper := this.reservation.Load()
+ // fmt.Printf("\t\t\t\t\t[BARRIER] Next (lower): %d, Reservation (upper): %d\n", lower, upper)
+ // fmt.Println("\t\t\t\t\t[BARRIER] Committed:", this.committed)
+ for ; lower <= upper; lower++ {
+ // fmt.Printf("\t\t\t\t\t[BARRIER] Inside Loop. Index: %d, Value: %d\n", sequence&mask, sequence>>shift)
+ if this.committed[lower&mask] != int32(lower>>shift) {
+ // fmt.Println("\t\t\t\t\t[BARRIER] Upstream Barrier:", sequence-1, this.committed)
+ return lower - 1
}
}
- // fmt.Printf("\t\t\t\t\t[SHARED-WRITER-BARRIER] Barrier Sequence: -1\n")
- return InitialSequenceValue
+ // fmt.Println("\t\t\t\t\t[BARRIER] Upstream Barrier (default):", lower)
+ return upper
}
diff --git a/writer.go b/writer.go
index b70e232..5c6e499 100644
--- a/writer.go
+++ b/writer.go
@@ -32,7 +32,7 @@
wrap := next - this.capacity
if wrap > this.gate {
- min := this.readerBarrier.Load()
+ min := this.readerBarrier.LoadBarrier(0)
if wrap > min {
return Gating
}