test/debug code.
diff --git a/reader.go b/reader.go
index 4ffda19..969fb6b 100644
--- a/reader.go
+++ b/reader.go
@@ -1,6 +1,9 @@
package disruptor
-import "time"
+import (
+ "runtime"
+ "time"
+)
type Reader struct {
read *Cursor
@@ -41,10 +44,12 @@
this.read.Store(upper)
previous = upper
} else if upper = this.written.Load(); lower <= upper {
+ time.Sleep(time.Microsecond)
// Gating--TODO: wait strategy (provide gating count to wait strategy for phased backoff)
gating++
idling = 0
} else if this.ready {
+ time.Sleep(time.Millisecond)
// Idling--TODO: wait strategy (provide idling count to wait strategy for phased backoff)
idling++
gating = 0
@@ -54,6 +59,6 @@
// sleeping increases the batch size which reduces number of writes required to store the sequence
// reducing the number of writes allows the CPU to optimize the pipeline without prediction failures
- time.Sleep(time.Microsecond)
+ runtime.Gosched() // LockSupport.parkNanos(1L); http://bit.ly/1xiDINZ
}
}
diff --git a/shared_writer.go b/shared_writer.go
index 7d5b3bd..ce284d3 100644
--- a/shared_writer.go
+++ b/shared_writer.go
@@ -1,6 +1,9 @@
package disruptor
-import "sync/atomic"
+import (
+ "runtime"
+ "sync/atomic"
+)
type SharedWriter struct {
written *Cursor
@@ -29,7 +32,10 @@
previous := this.written.Load()
upper := previous + count
- for upper-this.capacity > this.gate.Load() {
+ for spin := int64(0); upper-this.capacity > this.gate.Load(); spin++ {
+ if spin&SpinMask == 0 {
+ runtime.Gosched() // LockSupport.parkNanos(1L); http://bit.ly/1xiDINZ
+ }
this.gate.Store(this.upstream.Read(0))
}
@@ -40,7 +46,11 @@
}
func (this *SharedWriter) Commit(lower, upper int64) {
- if lower == upper {
+ if lower > upper {
+ panic("Attempting to commit a sequence where the lower reservation is greater than the higher reservation.")
+ } else if (upper - lower) > this.capacity {
+ panic("Attempting to commit a reservation larger than the size of the ring buffer. (upper-lower > this.capacity)")
+ } else if lower == upper {
this.committed[upper&this.mask] = int32(upper >> this.shift)
} else {
// working down the array rather than up keeps all items in the commit together