blob: 483c8a0e63f6eb86d136f2f093b8f1ce8d296309 [file] [log] [blame]
package main
func (this *SingleProducerSequencer) Next(items int64) int64 {
claimed, gate := this.claimed, this.gate
next := claimed + items
wrap := next - this.ringSize
// fmt.Printf("Producer:: Last claim: %d, Next: %d, Wrap: %d, Gate:%d\n", claimed, next, wrap, gate)
if wrap > gate {
last := this.last
min := last.Load(1)
// fmt.Printf("Producer:: (a) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
for wrap > min || min < 0 {
min = last.Load(1)
// fmt.Printf("Producer:: (b) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
// if wrap <= min {
// fmt.Printf("Producer:: Consumers have caught up to producer.\n")
// }
}
// fmt.Printf("Producer:: (c) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
this.gate = min
}
this.claimed = next
return next
}
func (this *SingleProducerSequencer) Publish(sequence int64) {
this.cursor[0] = sequence
}
func NewSingleProducerSequencer(cursor *Sequence, ringSize int32, last Barrier) *SingleProducerSequencer {
return &SingleProducerSequencer{
claimed: InitialSequenceValue,
gate: InitialSequenceValue,
cursor: cursor,
ringSize: int64(ringSize),
last: last,
}
}
type SingleProducerSequencer struct {
claimed int64
gate int64
cursor *Sequence
ringSize int64
last Barrier
}