blob: 82a7aef5fc1a2769a9e39cab4568710f1198fedf [file] [log] [blame]
package disruptor
type BatchReader struct {
started bool
read *Cursor
written *Cursor
upstream Barrier
consumer Consumer
waiter Waiter
}
func NewBatchReader(read, written *Cursor, upstream Barrier, consumer Consumer, waiter Waiter) *BatchReader {
return &BatchReader{
started: false,
read: read,
written: written,
upstream: upstream,
consumer: consumer,
waiter: waiter,
}
}
func (this *BatchReader) Start() {
this.started = true
go this.receive()
}
func (this *BatchReader) Stop() {
this.started = false
}
func (this *BatchReader) receive() {
sequence := this.read.Load()
for {
lower := sequence + 1
upper := this.upstream.LoadBarrier(lower)
if lower <= upper {
this.consumer.Consume(lower, upper)
sequence = upper
this.read.Store(sequence)
} else if gate := this.written.Load(); lower <= gate {
// time.Sleep(time.Millisecond) // TODO: use another method from the wait strategy?
} else if this.started {
// this.waiter.Wait()
} else {
break
}
}
}