Receive on Reader only returns the high-water mark.
diff --git a/example/example_consumer.go b/example/example_consumer.go
index 6cbe712..2732579 100644
--- a/example/example_consumer.go
+++ b/example/example_consumer.go
@@ -8,17 +8,21 @@
)
func consume0(reader *disruptor.SimpleReader) {
+ sequence := int64(0)
for {
- reader.Receive()
+ if upper := reader.Receive(sequence); upper > sequence {
+ sequence = upper
+ }
}
}
func consume1(reader *disruptor.Reader) {
started := time.Now()
+ sequence := int64(0)
for {
- lower, upper := reader.Receive()
- if lower <= upper {
- for sequence := lower; sequence <= upper; sequence++ {
+ upper := reader.Receive(sequence)
+ if sequence <= upper {
+ for ; sequence <= upper; sequence++ {
if sequence%ReportingFrequency == 0 {
finished := time.Now()
fmt.Println(sequence, finished.Sub(started))
@@ -32,20 +36,21 @@
panic(alert)
}
- ringBuffer[sequence&RingMask] = sequence % 2
+ // ringBuffer[sequence&RingMask] = sequence % 2
}
- reader.Commit(lower, upper)
+ reader.Commit(upper)
}
}
}
func consume2(reader *disruptor.Reader) {
+ sequence := int64(0)
for {
- lower, upper := reader.Receive()
+ upper := reader.Receive(sequence)
- if lower <= upper {
- for sequence := lower; sequence <= upper; sequence++ {
+ if sequence <= upper {
+ for ; sequence <= upper; sequence++ {
message := ringBuffer[sequence&RingMask]
if message != sequence%2 {
alert := fmt.Sprintf("Race Condition (Layer 2)::Sequence: %d, Message %d\n", sequence, message)
@@ -53,7 +58,7 @@
panic(alert)
}
}
- reader.Commit(lower, upper)
+ reader.Commit(upper)
}
}
}
diff --git a/example/example_consumer_handler.go b/example/example_consumer_handler.go
index 7823b58..abd1f80 100644
--- a/example/example_consumer_handler.go
+++ b/example/example_consumer_handler.go
@@ -22,10 +22,10 @@
this.started = time.Now()
}
- if sequence != ringBuffer[sequence&RingMask] {
- message := ringBuffer[sequence&RingMask]
- alert := fmt.Sprintf("Race Condition::Sequence: %d, Message %d\n", sequence, message)
- fmt.Print(alert)
- panic(alert)
- }
+ // if sequence != ringBuffer[sequence&RingMask] {
+ // message := ringBuffer[sequence&RingMask]
+ // alert := fmt.Sprintf("Race Condition::Sequence: %d, Message %d\n", sequence, message)
+ // fmt.Print(alert)
+ // panic(alert)
+ // }
}
diff --git a/example/example_producer.go b/example/example_producer.go
index ac85259..1599fdb 100644
--- a/example/example_producer.go
+++ b/example/example_producer.go
@@ -2,13 +2,15 @@
import "github.com/smartystreets/go-disruptor"
-func publish(writer *disruptor.SharedWriter) {
+func publish(writer *disruptor.Writer) {
for {
if lower, upper := writer.Reserve(ItemsToPublish); upper != disruptor.Gating {
- for sequence := lower; sequence <= upper; sequence++ {
- ringBuffer[sequence&RingMask] = sequence
- }
+
+ ringBuffer[lower&RingMask] = lower
+ // for sequence := lower; sequence <= upper; sequence++ {
+ // ringBuffer[sequence&RingMask] = sequence
+ // }
writer.Commit(lower, upper)
}
}
diff --git a/example/main.go b/example/main.go
index 6a991af..b976aad 100644
--- a/example/main.go
+++ b/example/main.go
@@ -8,9 +8,9 @@
const (
MaxConsumersPerGroup = 1
- MaxConsumerGroups = 2
- MaxProducers = 2
- ItemsToPublish = 2
+ MaxConsumerGroups = 1
+ MaxProducers = 1
+ ItemsToPublish = 1
ReportingFrequency = 1000000 * 10 // 1 million * N
RingSize = 1024 * 16
RingMask = RingSize - 1
@@ -22,26 +22,29 @@
runtime.GOMAXPROCS(MaxConsumerGroups*MaxConsumersPerGroup + MaxProducers)
written := disruptor.NewCursor()
- shared := disruptor.NewSharedWriterBarrier(written, RingSize)
- upstream := startConsumerGroups(shared, written)
- writer := disruptor.NewSharedWriter(shared, upstream)
- // writer := disruptor.NewWriter(written, upstream, RingSize)
- // startExclusiveProducer(writer)
- startSharedProducers(writer)
+ upstream := startConsumerGroups(written, written)
+ writer := disruptor.NewWriter(written, upstream, RingSize)
+ startExclusiveProducer(writer)
+
+ // written := disruptor.NewCursor()
+ // shared := disruptor.NewSharedWriterBarrier(written, RingSize)
+ // upstream := startConsumerGroups(shared, written)
+ // writer := disruptor.NewSharedWriter(shared, upstream)
+ // startSharedProducers(writer)
}
-// func startExclusiveProducer(writer *disruptor.Writer) {
-//publish(writer)
-// }
-
-func startSharedProducers(writer *disruptor.SharedWriter) {
- for i := 0; i < MaxProducers-1; i++ {
- go publish(writer)
- }
-
+func startExclusiveProducer(writer *disruptor.Writer) {
publish(writer)
}
+// func startSharedProducers(writer *disruptor.SharedWriter) {
+// for i := 0; i < MaxProducers-1; i++ {
+// go publish(writer)
+// }
+
+// publish(writer)
+// }
+
func startConsumerGroups(upstream disruptor.Barrier, written *disruptor.Cursor) disruptor.Barrier {
for i := 0; i < MaxConsumerGroups; i++ {
upstream = startConsumerGroup(i, upstream, written)
@@ -58,19 +61,19 @@
reader := disruptor.NewReader(read, written, upstream)
// constant time regardless of the number of items
- // go consume0(disruptor.NewSimpleReader(reader, NewExampleConsumerHandler()))
+ go consume0(disruptor.NewSimpleReader(reader, NewExampleConsumerHandler()))
// TODO: wildly sporadic latency for single-item publish, e.g. 2 seconds, 65 ms, etc.
// faster for 2-3+ items per publish
// go consume1(reader)
- if group == 0 {
- go consume1(reader)
- } else if group == 1 {
- go consume2(reader)
- } else {
- panic("only two consumer groups currently supported.")
- }
+ // if group == 0 {
+ // go consume1(reader)
+ // } else if group == 1 {
+ // go consume2(reader)
+ // } else {
+ // panic("only two consumer groups currently supported.")
+ // }
}
return disruptor.NewCompositeBarrier(cursors...)
diff --git a/reader.go b/reader.go
index e60ff31..8a723aa 100644
--- a/reader.go
+++ b/reader.go
@@ -14,15 +14,14 @@
}
}
-func (this *Reader) Receive() (int64, int64) {
- lower := this.read.Load() + 1
+func (this *Reader) Receive(lower int64) int64 {
upper := this.upstream.LoadBarrier(lower)
if lower <= upper {
- return lower, upper
+ return upper
} else if gate := this.written.Load(); lower <= gate {
- return InitialSequenceValue, Gating
+ return Gating
} else {
- return InitialSequenceValue, Idling
+ return Idling
}
}
diff --git a/reader_386.go b/reader_386.go
index 833c272..491bb61 100644
--- a/reader_386.go
+++ b/reader_386.go
@@ -1,5 +1,5 @@
package disruptor
-func (this *Reader) Commit(lower, upper int64) {
+func (this *Reader) Commit(upper int64) {
this.read.Store(upper)
}
diff --git a/reader_amd64.go b/reader_amd64.go
index 9ef8bf3..36236b2 100644
--- a/reader_amd64.go
+++ b/reader_amd64.go
@@ -1,5 +1,5 @@
package disruptor
-func (this *Reader) Commit(lower, upper int64) {
- this.read.sequence = upper
+func (this *Reader) Commit(sequence int64) {
+ this.read.sequence = sequence
}
diff --git a/reader_arm.go b/reader_arm.go
index 833c272..f1df97c 100644
--- a/reader_arm.go
+++ b/reader_arm.go
@@ -1,5 +1,5 @@
package disruptor
-func (this *Reader) Commit(lower, upper int64) {
- this.read.Store(upper)
+func (this *Reader) Commit(sequence int64) {
+ this.read.Store(sequence)
}
diff --git a/simple_reader.go b/simple_reader.go
index fe781b7..2a619c0 100644
--- a/simple_reader.go
+++ b/simple_reader.go
@@ -9,17 +9,16 @@
return &SimpleReader{reader: reader, callback: callback}
}
-func (this *SimpleReader) Receive() (int64, int64) {
- lower, upper := this.reader.Receive()
+func (this *SimpleReader) Receive(lower int64) int64 {
+ upper := this.reader.Receive(lower)
if lower <= upper {
for sequence := lower; sequence <= upper; sequence++ {
this.callback.Consume(sequence, upper-sequence)
}
- this.reader.Commit(lower, upper)
- return lower, upper
- } else {
- return InitialSequenceValue, upper // Idling, Gating
+ this.reader.Commit(upper)
}
+
+ return upper
}