Interface-based called to consumer--1.2ns per operation.
diff --git a/consumer.go b/consumer.go
index 10446ec..4a779e0 100644
--- a/consumer.go
+++ b/consumer.go
@@ -1,5 +1,5 @@
package disruptor
type Consumer interface {
- Consume(int64, int64) int64
+ Consume(int64, int64)
}
diff --git a/example/main.go b/example/main.go
index 15f8ad3..2daedd3 100644
--- a/example/main.go
+++ b/example/main.go
@@ -20,19 +20,15 @@
runtime.GOMAXPROCS(2)
written, read := disruptor.NewCursor(), disruptor.NewCursor()
-
- // reader := disruptor.NewReader(read, written, written, SampleConsumer{})
-
started := time.Now()
- // go reader.Start()
- go consume(written, read)
+ go consume(written, read, SampleConsumer{})
publish(written, read)
- // reader.Stop()
- // consume(written, read)
finished := time.Now()
fmt.Println(Iterations, finished.Sub(started))
+
+ time.Sleep(time.Millisecond * 10)
}
func publish(written, read *disruptor.Cursor) {
@@ -45,10 +41,6 @@
for wrap > gate {
gate = read.Sequence
- // if wrap > gate {
- // // time.Sleep(time.Nanosecond)
- // time.Sleep(time.Microsecond)
- // }
}
ringBuffer[next&BufferMask] = next
@@ -57,11 +49,7 @@
}
}
-// func consume(reader *disruptor.Reader) {
-func consume(written, read *disruptor.Cursor) {
- sleeps := 0
- consumer := SampleConsumer{}
-
+func consume(written, read *disruptor.Cursor, consumer disruptor.Consumer) {
previous := int64(-1)
gate := int64(-1)
for previous < Iterations {
@@ -69,52 +57,23 @@
gate = written.Sequence
if current <= gate {
-
- for current < gate {
- current += consumer.Consume(current, gate)
- }
- // for current <= gate {
- // if ringBuffer[current&BufferMask] > 0 {
- // }
-
- // current++
- // }
-
+ consumer.Consume(current, gate)
previous = gate
read.Sequence = gate
} else {
- sleeps++
- time.Sleep(time.Microsecond)
+ // TODO: wait strategy
}
+
+ time.Sleep(time.Nanosecond)
}
-
- fmt.Println("Consumer sleeps:", sleeps)
-
- // // for sequence, gate := int64(0), int64(0); sequence < Iterations; sequence++ {
-
- // // // for gate <= sequence {
- // // // gate = written.Sequence
- // // // if gate <= sequence {
- // // // time.Sleep(time.Microsecond)
- // // // }
- // // // }
-
- // // // if ringBuffer[sequence&BufferMask] > 0 {
- // // // }
-
- // // // read.Sequence = sequence
- // // }
}
-// // type Consumer interface {
-// // Consume(lower, upper int64)
-// // }
-
type SampleConsumer struct{}
-func (this SampleConsumer) Consume(current, gate int64) int64 {
- if ringBuffer[current&BufferMask] > 0 {
+func (this SampleConsumer) Consume(lower, upper int64) {
+ for lower <= upper {
+ if ringBuffer[lower&BufferMask] > 0 {
+ }
+ lower++
}
-
- return 1
}
diff --git a/reader.go b/reader.go
index e70ace1..b77badc 100644
--- a/reader.go
+++ b/reader.go
@@ -34,9 +34,9 @@
gate := this.upstream.Read(current)
if current <= gate {
- for current < gate {
- current += this.consumer.Consume(current, gate)
- }
+ // for current < gate {
+ // current += this.consumer.Consume(current, gate)
+ // }
this.read.Store(current)
current++
} else if gate = this.written.Load(); current <= gate {