Experimenting with simpler reader interface.
diff --git a/batch_reader.go b/batch_reader.go
new file mode 100644
index 0000000..82a7aef
--- /dev/null
+++ b/batch_reader.go
@@ -0,0 +1,50 @@
+package disruptor
+
+type BatchReader struct {
+ started bool
+ read *Cursor
+ written *Cursor
+ upstream Barrier
+ consumer Consumer
+ waiter Waiter
+}
+
+func NewBatchReader(read, written *Cursor, upstream Barrier, consumer Consumer, waiter Waiter) *BatchReader {
+ return &BatchReader{
+ started: false,
+ read: read,
+ written: written,
+ upstream: upstream,
+ consumer: consumer,
+ waiter: waiter,
+ }
+}
+
+func (this *BatchReader) Start() {
+ this.started = true
+ go this.receive()
+}
+func (this *BatchReader) Stop() {
+ this.started = false
+}
+
+func (this *BatchReader) receive() {
+ sequence := this.read.Load()
+
+ for {
+ lower := sequence + 1
+ upper := this.upstream.LoadBarrier(lower)
+
+ if lower <= upper {
+ this.consumer.Consume(lower, upper)
+ sequence = upper
+ this.read.Store(sequence)
+ } else if gate := this.written.Load(); lower <= gate {
+ // time.Sleep(time.Millisecond) // TODO: use another method from the wait strategy?
+ } else if this.started {
+ // this.waiter.Wait()
+ } else {
+ break
+ }
+ }
+}
diff --git a/consumer.go b/consumer.go
index 6e6998b..c1971c8 100644
--- a/consumer.go
+++ b/consumer.go
@@ -1,5 +1,5 @@
package disruptor
type Consumer interface {
- Consume(sequence, remaining int64)
+ Consume(lower, upper int64)
}
diff --git a/example/example_consumer.go b/example/example_consumer.go
index f6eac20..2c4057e 100644
--- a/example/example_consumer.go
+++ b/example/example_consumer.go
@@ -7,11 +7,11 @@
"github.com/smartystreets/go-disruptor"
)
-func consume0(reader *disruptor.SimpleReader) {
- for {
- reader.Receive()
- }
-}
+// func consume0(reader *disruptor.SimpleReader) {
+// for {
+// reader.Receive()
+// }
+// }
func consume1(reader *disruptor.Reader) {
started := time.Now()
diff --git a/example/example_consumer_handler.go b/example/example_consumer_handler.go
index 7823b58..cedf859 100644
--- a/example/example_consumer_handler.go
+++ b/example/example_consumer_handler.go
@@ -15,17 +15,21 @@
return &ExampleConsumerHandler{started: time.Now()}
}
-func (this *ExampleConsumerHandler) Consume(sequence, remaining int64) {
+func (this *ExampleConsumerHandler) Consume(lower, upper int64) {
+ sequence := lower
+ //for sequence := lower; sequence <= upper; sequence++ {
+
if sequence%ReportingFrequency == 0 {
finished := time.Now()
fmt.Println(sequence, finished.Sub(this.started))
this.started = time.Now()
}
- if sequence != ringBuffer[sequence&RingMask] {
- message := ringBuffer[sequence&RingMask]
- alert := fmt.Sprintf("Race Condition::Sequence: %d, Message %d\n", sequence, message)
- fmt.Print(alert)
- panic(alert)
- }
+ // if sequence != ringBuffer[sequence&RingMask] {
+ // message := ringBuffer[sequence&RingMask]
+ // alert := fmt.Sprintf("Race Condition::Sequence: %d, Message %d\n", sequence, message)
+ // fmt.Print(alert)
+ // panic(alert)
+ // }
+ //}
}
diff --git a/example/example_producer.go b/example/example_producer.go
index ac85259..5ca9939 100644
--- a/example/example_producer.go
+++ b/example/example_producer.go
@@ -2,13 +2,14 @@
import "github.com/smartystreets/go-disruptor"
-func publish(writer *disruptor.SharedWriter) {
+func publish(writer *disruptor.Writer) {
for {
if lower, upper := writer.Reserve(ItemsToPublish); upper != disruptor.Gating {
- for sequence := lower; sequence <= upper; sequence++ {
- ringBuffer[sequence&RingMask] = sequence
- }
+ // for sequence := lower; sequence <= upper; sequence++ {
+ // ringBuffer[sequence&RingMask] = sequence
+ // }
+ // ringBuffer[lower&RingMask] = lower
writer.Commit(lower, upper)
}
}
diff --git a/example/main.go b/example/main.go
index 5428882..2b58e25 100644
--- a/example/main.go
+++ b/example/main.go
@@ -8,10 +8,10 @@
const (
MaxConsumersPerGroup = 1
- MaxConsumerGroups = 2
- MaxProducers = 2
- ItemsToPublish = 4
- ReportingFrequency = 1000000 * 10 // 1 million * N
+ MaxConsumerGroups = 1
+ MaxProducers = 1
+ ItemsToPublish = 1
+ ReportingFrequency = 1000000 * 100 // 1 million * N
RingSize = 1024 * 16
RingMask = RingSize - 1
)
@@ -21,30 +21,30 @@
func main() {
runtime.GOMAXPROCS(MaxConsumerGroups*MaxConsumersPerGroup + MaxProducers)
- // written := disruptor.NewCursor()
- // upstream := startConsumerGroups(written, written)
- // writer := disruptor.NewWriter(written, upstream, RingSize)
- // startExclusiveProducer(writer)
-
written := disruptor.NewCursor()
- shared := disruptor.NewSharedWriterBarrier(written, RingSize)
- upstream := startConsumerGroups(shared, written)
- writer := disruptor.NewSharedWriter(shared, upstream)
- startSharedProducers(writer)
+ upstream := startConsumerGroups(written, written)
+ writer := disruptor.NewWriter(written, upstream, RingSize)
+ startExclusiveProducer(writer)
+
+ // written := disruptor.NewCursor()
+ // shared := disruptor.NewSharedWriterBarrier(written, RingSize)
+ // upstream := startConsumerGroups(shared, written)
+ // writer := disruptor.NewSharedWriter(shared, upstream)
+ // startSharedProducers(writer)
}
-// func startExclusiveProducer(writer *disruptor.Writer) {
-// publish(writer)
-// }
-
-func startSharedProducers(writer *disruptor.SharedWriter) {
- for i := 0; i < MaxProducers-1; i++ {
- go publish(writer)
- }
-
+func startExclusiveProducer(writer *disruptor.Writer) {
publish(writer)
}
+// func startSharedProducers(writer *disruptor.SharedWriter) {
+// for i := 0; i < MaxProducers-1; i++ {
+// go publish(writer)
+// }
+
+// publish(writer)
+// }
+
func startConsumerGroups(upstream disruptor.Barrier, written *disruptor.Cursor) disruptor.Barrier {
for i := 0; i < MaxConsumerGroups; i++ {
upstream = startConsumerGroup(i, upstream, written)
@@ -58,7 +58,12 @@
for i := 0; i < MaxConsumersPerGroup; i++ {
read := disruptor.NewCursor()
cursors = append(cursors, read)
- reader := disruptor.NewReader(read, written, upstream)
+ // reader := disruptor.NewReader(read, written, upstream)
+
+ consumer := NewExampleConsumerHandler()
+ waiter := SleepWaiter{}
+
+ disruptor.NewBatchReader(read, written, upstream, consumer, waiter).Start()
// constant time regardless of the number of items
// go consume0(disruptor.NewSimpleReader(reader, NewExampleConsumerHandler()))
@@ -67,13 +72,13 @@
// faster for 2-3+ items per publish
// go consume1(reader)
- if group == 0 {
- go consume1(reader)
- } else if group == 1 {
- go consume2(reader)
- } else {
- panic("only two consumer groups currently supported.")
- }
+ // if group == 0 {
+ // go consume1(reader)
+ // } else if group == 1 {
+ // go consume2(reader)
+ // } else {
+ // panic("only two consumer groups currently supported.")
+ // }
}
return disruptor.NewCompositeBarrier(cursors...)
diff --git a/example/sleep_waiter.go b/example/sleep_waiter.go
new file mode 100644
index 0000000..941fd36
--- /dev/null
+++ b/example/sleep_waiter.go
@@ -0,0 +1,6 @@
+package main
+
+type SleepWaiter struct{}
+
+func (this SleepWaiter) Wait() {
+}
diff --git a/simple_reader.go b/simple_reader.go
deleted file mode 100644
index 8fc5b20..0000000
--- a/simple_reader.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package disruptor
-
-type SimpleReader struct {
- reader *Reader
- callback Consumer
-}
-
-func NewSimpleReader(reader *Reader, callback Consumer) *SimpleReader {
- return &SimpleReader{reader: reader, callback: callback}
-}
-
-func (this *SimpleReader) Receive() (int64, int64) {
- lower, upper := this.reader.Receive()
-
- if lower <= upper {
- for sequence := lower; sequence <= upper; sequence++ {
- this.callback.Consume(sequence, upper-sequence)
- }
-
- this.reader.Commit(upper)
- }
-
- return lower, upper
-}
diff --git a/waiter.go b/waiter.go
new file mode 100644
index 0000000..f284021
--- /dev/null
+++ b/waiter.go
@@ -0,0 +1,5 @@
+package disruptor
+
+type Waiter interface {
+ Wait()
+}