blob: 52d10a2ed1742c5f84598497084d261d7ad03ed3 [file] [log] [blame]
package disruptor
const (
Gating = -2
Idle = -3
)
type Reader struct {
upstreamBarrier Barrier
callback Consumer
writerCursor *Cursor
readerCursor *Cursor
}
func NewReader(upstreamBarrier Barrier, callback Consumer, writerCursor, readerCursor *Cursor) *Reader {
return &Reader{
upstreamBarrier: upstreamBarrier,
callback: callback,
writerCursor: writerCursor,
readerCursor: readerCursor,
}
}
// IDEA: Read returns remaining and consumer calls Commit(seq) once they're done reading
func (this *Reader) Process() int64 {
next := this.readerCursor.Load() + 1
ready := this.upstreamBarrier()
if next <= ready {
for next <= ready {
this.callback.Consume(next, ready-next)
next++
}
next--
this.readerCursor.Store(next)
return next
} else if next <= this.writerCursor.Load() {
return Gating
} else {
return Idle
}
}