Added composite barrier; added constants.
diff --git a/composite_barrier.go b/composite_barrier.go
new file mode 100644
index 0000000..30799d2
--- /dev/null
+++ b/composite_barrier.go
@@ -0,0 +1,28 @@
+package disruptor
+
+// performance TODO: type CompositeBarrier []*Cursor
+type CompositeBarrier struct {
+ cursors []*Cursor
+}
+
+func NewCompositeBarrier(upstream ...*Cursor) *CompositeBarrier {
+ if len(upstream) == 0 {
+ panic("At least one upstream cursor is required.")
+ }
+
+ cursors := make([]*Cursor, len(upstream))
+ copy(cursors, upstream)
+ return &CompositeBarrier{cursors}
+}
+
+func (this *CompositeBarrier) Read(noop int64) int64 {
+ minimum := MaxSequenceValue
+ for _, item := range this.cursors {
+ sequence := item.Load()
+ if sequence < minimum {
+ minimum = sequence
+ }
+ }
+
+ return minimum
+}
diff --git a/composite_barrier_test.go b/composite_barrier_test.go
new file mode 100644
index 0000000..9696429
--- /dev/null
+++ b/composite_barrier_test.go
@@ -0,0 +1,15 @@
+package disruptor
+
+import "testing"
+
+func BenchmarkCompositeBarrierRead(b *testing.B) {
+ barrier := NewCompositeBarrier(NewCursor(), NewCursor(), NewCursor(), NewCursor())
+
+ iterations := int64(b.N)
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for i := int64(0); i < iterations; i++ {
+ barrier.Read(0)
+ }
+}
diff --git a/cursor.go b/cursor.go
index ac950f5..300796f 100644
--- a/cursor.go
+++ b/cursor.go
@@ -1,6 +1,7 @@
package disruptor
const (
+ MaxSequenceValue int64 = (1 << 63) - 1
InitialSequenceValue int64 = -1
cpuCacheLinePadding = 7
)
diff --git a/cursor_test.go b/cursor_test.go
index 970025a..9752c85 100644
--- a/cursor_test.go
+++ b/cursor_test.go
@@ -38,3 +38,15 @@
cursor.Read(i)
}
}
+
+func BenchmarkCursorAsBarrier(b *testing.B) {
+ var barrier Barrier = NewCursor()
+
+ iterations := int64(b.N)
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for i := int64(0); i < iterations; i++ {
+ barrier.Read(0)
+ }
+}
diff --git a/writer_test.go b/writer_test.go
index 990f63d..30edd90 100644
--- a/writer_test.go
+++ b/writer_test.go
@@ -22,7 +22,7 @@
b.ReportAllocs()
b.ResetTimer()
- read.Store(1<<63 - 1)
+ read.Store(MaxSequenceValue)
for i := int64(0); i < iterations; i++ {
writer.Reserve(1)
}