blob: 0579616125043782a56dd755c6f6a88ff6a29dfe [file] [log] [blame]
package main
import (
"fmt"
"time"
"github.com/smartystreets/go-disruptor"
)
func consume0(reader *disruptor.SimpleReader) {
for {
reader.Receive()
}
}
func consume1(reader *disruptor.Reader) {
started := time.Now()
// fmt.Printf("\t\t\t\t\t[CONSUMER] Starting consumer...\n")
for {
sequence, remaining := reader.Receive()
if remaining >= 0 {
// fmt.Printf("\t\t\t\t\t[CONSUMER] Received messages starting at sequence %d, with %d messages remaining\n", sequence, remaining)
for remaining >= 0 {
if sequence%ReportingFrequency == 0 {
finished := time.Now()
fmt.Println(sequence, finished.Sub(started))
started = time.Now()
}
message := ringBuffer[sequence&RingMask]
// fmt.Printf("\t\t\t\t\t[CONSUMER] Consuming sequence %d. Message Payload: %d\n", sequence, message)
if sequence != message {
// alert := fmt.Sprintf("--------------\n\t\t\t\t\t[CONSUMER] ***Race Condition***::Sequence: %d, Message %d\n", sequence, message)
alert := fmt.Sprintf("***Race Condition***::Sequence: %d, Message %d\n", sequence, message)
fmt.Println(alert)
panic(alert)
}
//ringBuffer[sequence&RingMask] = sequence % 2
remaining--
sequence++
}
// fmt.Println("\t\t\t\t\t[CONSUMER] All messages consumed, committing up to sequence ", sequence-1)
reader.Commit(sequence - 1)
// } else {
// if remaining == disruptor.Gating {
// fmt.Println("\t\t\t\t\t[CONSUMER] Consumer gating at sequence", sequence)
// } else if remaining == disruptor.Idling {
// fmt.Println("\t\t\t\t\t[CONSUMER] Consumer idling at sequence", sequence)
// }
// time.Sleep(time.Millisecond * 10)
}
}
}
func consume2(reader *disruptor.Reader) {
for {
sequence, remaining := reader.Receive()
if remaining >= 0 {
for remaining >= 0 {
message := ringBuffer[sequence&RingMask]
if message != sequence%2 {
alert := fmt.Sprintf("Race Condition (Layer 2)::Sequence: %d, Message %d\n", sequence, message)
fmt.Print(alert)
panic(alert)
}
remaining--
sequence++
}
reader.Commit(sequence - 1)
} else {
}
}
}