Simplified--still at 500M/s.
diff --git a/example/example_consumer.go b/example/example_consumer.go
index 7dac0d1..fd2947e 100644
--- a/example/example_consumer.go
+++ b/example/example_consumer.go
@@ -10,16 +10,15 @@
const Mod = 1000000 * 10 // 1 million * N
func consume(writerBarrier disruptor.Barrier, writerCursor, readerCursor *disruptor.Cursor) {
+ // runtime.LockOSThread()
+
reader := disruptor.NewReader(writerBarrier, writerCursor, readerCursor)
started := time.Now()
for {
sequence, remaining := reader.Receive()
- if remaining == disruptor.Idle {
- } else if remaining == disruptor.Gating {
- } else {
+ if remaining >= 0 {
for ; remaining >= 0; remaining-- {
- sequence++
if sequence%Mod == 0 {
finished := time.Now()
@@ -32,9 +31,13 @@
panic(fmt.Sprintf("Sequence: %d, Message %d\n", sequence, message))
}
+ sequence++
}
reader.Commit(sequence)
+
+ } else {
+
}
}
}
diff --git a/example/example_producer.go b/example/example_producer.go
index 41ea906..8cc153f 100644
--- a/example/example_producer.go
+++ b/example/example_producer.go
@@ -3,16 +3,16 @@
import "github.com/smartystreets/go-disruptor"
func publish(writer *disruptor.Writer) {
+ // runtime.LockOSThread()
+
for {
- _, upper := writer.Reserve(1)
- if upper == disruptor.Gating {
- continue
+ upper := writer.Reserve(3)
+ if upper != disruptor.Gating {
+
+ ringBuffer[(upper-2)&RingMask] = upper - 2
+ ringBuffer[(upper-1)&RingMask] = upper - 1
+ ringBuffer[upper&RingMask] = upper
+ writer.Commit(upper)
}
-
- // ringBuffer[(upper-2)&RingMask] = upper - 2
- // ringBuffer[(upper-1)&RingMask] = upper - 1
- ringBuffer[upper&RingMask] = upper
-
- writer.Commit(upper)
}
}
diff --git a/reader.go b/reader.go
index d8bf281..fefcfe9 100644
--- a/reader.go
+++ b/reader.go
@@ -20,15 +20,14 @@
}
func (this *Reader) Receive() (int64, int64) {
- current := this.readerCursor.Load()
- next := current + 1
+ next := this.readerCursor.Load() + 1
ready := this.upstreamBarrier()
if next <= ready {
- return current, ready - next
+ return next, ready - next
} else if next <= this.writerCursor.Load() {
- return current, Gating
+ return next, Gating
} else {
- return current, Idle
+ return next, Idle
}
}
diff --git a/writer.go b/writer.go
index f361508..aa10c66 100644
--- a/writer.go
+++ b/writer.go
@@ -26,20 +26,19 @@
return value > 0 && (value&(value-1)) == 0
}
-func (this *Writer) Reserve(items int64) (int64, int64) {
- current := this.previous + 1
+func (this *Writer) Reserve(items int64) int64 {
next := this.previous + items
wrap := next - this.ringSize
if wrap > this.gate {
min := this.readerBarrier()
if wrap > min {
- return current, Gating
+ return Gating
}
this.gate = min
}
this.previous = next
- return current, next
+ return next
}