Testing multiple writers again.
diff --git a/example/example_producer.go b/example/example_producer.go
index aaf8c51..ac85259 100644
--- a/example/example_producer.go
+++ b/example/example_producer.go
@@ -2,7 +2,7 @@
import "github.com/smartystreets/go-disruptor"
-func publish(writer *disruptor.Writer) {
+func publish(writer *disruptor.SharedWriter) {
for {
if lower, upper := writer.Reserve(ItemsToPublish); upper != disruptor.Gating {
diff --git a/example/main.go b/example/main.go
index b976aad..be6eec3 100644
--- a/example/main.go
+++ b/example/main.go
@@ -8,9 +8,9 @@
const (
MaxConsumersPerGroup = 1
- MaxConsumerGroups = 1
+ MaxConsumerGroups = 2
MaxProducers = 1
- ItemsToPublish = 1
+ ItemsToPublish = 4
ReportingFrequency = 1000000 * 10 // 1 million * N
RingSize = 1024 * 16
RingMask = RingSize - 1
@@ -21,30 +21,30 @@
func main() {
runtime.GOMAXPROCS(MaxConsumerGroups*MaxConsumersPerGroup + MaxProducers)
- written := disruptor.NewCursor()
- upstream := startConsumerGroups(written, written)
- writer := disruptor.NewWriter(written, upstream, RingSize)
- startExclusiveProducer(writer)
-
// written := disruptor.NewCursor()
- // shared := disruptor.NewSharedWriterBarrier(written, RingSize)
- // upstream := startConsumerGroups(shared, written)
- // writer := disruptor.NewSharedWriter(shared, upstream)
- // startSharedProducers(writer)
+ // upstream := startConsumerGroups(written, written)
+ // writer := disruptor.NewWriter(written, upstream, RingSize)
+ // startExclusiveProducer(writer)
+
+ written := disruptor.NewCursor()
+ shared := disruptor.NewSharedWriterBarrier(written, RingSize)
+ upstream := startConsumerGroups(shared, written)
+ writer := disruptor.NewSharedWriter(shared, upstream)
+ startSharedProducers(writer)
}
-func startExclusiveProducer(writer *disruptor.Writer) {
- publish(writer)
-}
-
-// func startSharedProducers(writer *disruptor.SharedWriter) {
-// for i := 0; i < MaxProducers-1; i++ {
-// go publish(writer)
-// }
-
+// func startExclusiveProducer(writer *disruptor.Writer) {
// publish(writer)
// }
+func startSharedProducers(writer *disruptor.SharedWriter) {
+ for i := 0; i < MaxProducers-1; i++ {
+ go publish(writer)
+ }
+
+ publish(writer)
+}
+
func startConsumerGroups(upstream disruptor.Barrier, written *disruptor.Cursor) disruptor.Barrier {
for i := 0; i < MaxConsumerGroups; i++ {
upstream = startConsumerGroup(i, upstream, written)
@@ -61,19 +61,19 @@
reader := disruptor.NewReader(read, written, upstream)
// constant time regardless of the number of items
- go consume0(disruptor.NewSimpleReader(reader, NewExampleConsumerHandler()))
+ // go consume0(disruptor.NewSimpleReader(reader, NewExampleConsumerHandler()))
// TODO: wildly sporadic latency for single-item publish, e.g. 2 seconds, 65 ms, etc.
// faster for 2-3+ items per publish
// go consume1(reader)
- // if group == 0 {
- // go consume1(reader)
- // } else if group == 1 {
- // go consume2(reader)
- // } else {
- // panic("only two consumer groups currently supported.")
- // }
+ if group == 0 {
+ go consume1(reader)
+ } else if group == 1 {
+ go consume2(reader)
+ } else {
+ panic("only two consumer groups currently supported.")
+ }
}
return disruptor.NewCompositeBarrier(cursors...)