Moved code into reader with same performance--1.2ns per operation.
diff --git a/example/main.go b/example/main.go
index 0a24aa6..9237fbb 100644
--- a/example/main.go
+++ b/example/main.go
@@ -20,11 +20,12 @@
runtime.GOMAXPROCS(2)
written, read := disruptor.NewCursor(), disruptor.NewCursor()
+ reader := disruptor.NewReader(read, written, written, SampleConsumer{})
+
started := time.Now()
-
- go consume(written, read, SampleConsumer{})
+ reader.Start()
publish(written, read)
-
+ reader.Stop()
finished := time.Now()
fmt.Println(Iterations, finished.Sub(started))
@@ -49,55 +50,6 @@
}
}
-func consume(written, read *disruptor.Cursor, consumer disruptor.Consumer) {
- previous := int64(-1)
- upstream := disruptor.Barrier(written)
- idling, gating := 0, 0
-
- for {
- lower := previous + 1
- upper := upstream.Read(lower)
-
- if lower <= upper {
- consumer.Consume(lower, upper)
- read.Sequence = upper
- previous = upper
- } else if upper = written.Load(); lower <= upper {
- // Gating--TODO: wait strategy (provide gating count to wait strategy for phased backoff)
- gating++
- idling = 0
- } else if previous < Iterations {
- // Idling--TODO: wait strategy (provide idling count to wait strategy for phased backoff)
- idling++
- gating = 0
- } else {
- break
- }
-
- time.Sleep(time.Nanosecond)
- }
-
- // for previous < Iterations {
- // lower := previous + 1
- // upper := upstream.Read(lower)
-
- // if lower <= upper {
- // consumer.Consume(lower, upper)
- // read.Sequence = upper
- // previous = upper
- // } else if upper = written.Sequence; lower <= upper {
- // // TODO: gating strategy
- // } else {
- // // TODO: idling strategy
- // idling++
- // }
-
- // time.Sleep(time.Nanosecond)
- // }
-
- fmt.Println("Consumer idling/gating", idling, gating)
-}
-
type SampleConsumer struct{}
func (this SampleConsumer) Consume(lower, upper int64) {
diff --git a/reader.go b/reader.go
index b77badc..e5ee251 100644
--- a/reader.go
+++ b/reader.go
@@ -29,28 +29,29 @@
}
func (this *Reader) receive() {
- current := this.read.Sequence + 1
- for {
- gate := this.upstream.Read(current)
+ previous := this.read.Sequence
+ idling, gating := 0, 0
- if current <= gate {
- // for current < gate {
- // current += this.consumer.Consume(current, gate)
- // }
- this.read.Store(current)
- current++
- } else if gate = this.written.Load(); current <= gate {
+ for {
+ lower := previous + 1
+ upper := this.upstream.Read(lower)
+
+ if lower <= upper {
+ this.consumer.Consume(lower, upper)
+ this.read.Sequence = upper
+ previous = upper
+ } else if upper = this.written.Load(); lower <= upper {
// Gating--TODO: wait strategy (provide gating count to wait strategy for phased backoff)
- // gating++
- // idling = 0
- time.Sleep(time.Microsecond)
+ gating++
+ idling = 0
} else if this.ready {
// Idling--TODO: wait strategy (provide idling count to wait strategy for phased backoff)
- // idling++
- // gating = 0
- time.Sleep(time.Microsecond)
+ idling++
+ gating = 0
} else {
break
}
+
+ time.Sleep(time.Nanosecond)
}
}