Creating a specialized barrier to facilitate shared writes.
diff --git a/shared_writer.go b/shared_writer.go
index ec15c7f..16ebb56 100644
--- a/shared_writer.go
+++ b/shared_writer.go
@@ -1,49 +1,28 @@
package disruptor
-import (
- "math"
- "sync/atomic"
-)
+import "sync/atomic"
type SharedWriter struct {
capacity int64
- gate int64 // TODO: this will most likely need to be a cursor
+ gate int64 // TODO: determine if this should be a *Cursor
shift uint8
committed []int32
upstream Barrier
reservation *Cursor
}
-func NewSharedWriter(reservation *Cursor, capacity int64, upstream Barrier) *SharedWriter {
- assertPowerOfTwo(capacity)
-
- shift := uint8(math.Log2(float64(capacity)))
- buffer := initializeCommittedBuffer(capacity)
-
+func NewSharedWriter(shared *SharedWriterBarrier, upstream Barrier) *SharedWriter {
return &SharedWriter{
- capacity: capacity,
+ capacity: shared.capacity,
gate: InitialSequenceValue,
- shift: shift,
- committed: buffer,
+ shift: shared.shift,
+ committed: shared.committed,
upstream: upstream,
- reservation: reservation,
+ reservation: shared.reservation,
}
}
-func initializeCommittedBuffer(capacity int64) []int32 {
- buffer := make([]int32, capacity)
- for i := range buffer {
- buffer[i] = int32(InitialSequenceValue)
- }
- return buffer
-}
func (this *SharedWriter) Reserve(count int64) (int64, int64) {
- if count <= 0 {
- panic("Reservation must be a positive number.")
- } else if count > this.capacity {
- panic("Reservation cannot exceed the capacity.")
- }
-
for {
previous := this.reservation.Load()
next := previous + count
@@ -69,15 +48,3 @@
this.committed[lower&mask] = int32(lower >> this.shift)
}
}
-
-func (this *SharedWriter) Load() int64 {
- sequence := this.reservation.Load()
-
- for mask := this.capacity - 1; sequence >= 0; sequence-- {
- if this.committed[sequence&mask] == int32(sequence>>this.shift) {
- return sequence
- }
- }
-
- return sequence
-}
diff --git a/shared_writer_barrier.go b/shared_writer_barrier.go
new file mode 100644
index 0000000..b9499d9
--- /dev/null
+++ b/shared_writer_barrier.go
@@ -0,0 +1,40 @@
+package disruptor
+
+import "math"
+
+type SharedWriterBarrier struct {
+ committed []int32
+ capacity int64
+ mask int64
+ shift uint8
+ reservation *Cursor
+}
+
+func NewSharedWriterBarrier(reservation *Cursor, capacity int64) *SharedWriterBarrier {
+ assertPowerOfTwo(capacity)
+
+ return &SharedWriterBarrier{
+ committed: prepareCommitBuffer(capacity),
+ capacity: capacity,
+ mask: capacity - 1,
+ shift: uint8(math.Log2(float64(capacity))),
+ reservation: reservation,
+ }
+}
+func prepareCommitBuffer(capacity int64) []int32 {
+ buffer := make([]int32, capacity)
+ for i := range buffer {
+ buffer[i] = int32(InitialSequenceValue)
+ }
+ return buffer
+}
+
+func (this *SharedWriterBarrier) Load() int64 {
+ for sequence := this.reservation.Load(); sequence >= 0; sequence-- {
+ if this.committed[sequence&this.mask] == int32(sequence>>this.shift) {
+ return sequence
+ }
+ }
+
+ return InitialSequenceValue
+}