blob: da0938f3c55e96e826ee3bc5874bf5eceb6722ec [file] [log] [blame]
package disruptor
import (
"io"
"sync/atomic"
)
type DefaultReader struct {
state int64
current *Cursor // this reader has processed up to this sequence
written *Cursor // the ring buffer has been written up to this sequence
upstream Barrier // all of the readers have advanced up to this sequence
waiter WaitStrategy
consumer Consumer
}
func NewReader(current, written *Cursor, upstream Barrier, waiter WaitStrategy, consumer Consumer) *DefaultReader {
return &DefaultReader{
state: stateRunning,
current: current,
written: written,
upstream: upstream,
waiter: waiter,
consumer: consumer,
}
}
func (this *DefaultReader) Read() {
var gateCount, idleCount, lower, upper int64
var current = this.current.Load()
for {
lower = current + 1
upper = this.upstream.Load()
if lower <= upper {
this.consumer.Consume(lower, upper)
this.current.Store(upper)
current = upper
} else if upper = this.written.Load(); lower <= upper {
gateCount++
idleCount = 0
this.waiter.Gate(gateCount)
} else if atomic.LoadInt64(&this.state) == stateRunning {
idleCount++
gateCount = 0
this.waiter.Idle(idleCount)
} else {
break
}
}
if closer, ok := this.consumer.(io.Closer); ok {
_ = closer.Close()
}
}
func (this *DefaultReader) Close() error {
atomic.StoreInt64(&this.state, stateClosed)
return nil
}
const (
stateRunning = iota
stateClosed
)