Isolated and resolved ARM-related issue; removed debugging code.
diff --git a/barrier.go b/barrier.go
index adc049d..b5aa8a9 100644
--- a/barrier.go
+++ b/barrier.go
@@ -1,17 +1,10 @@
package main
-func (this Barrier) Load(consumer int) int64 {
+func (this Barrier) Load() int64 {
minimum := MaxSequenceValue
for i := 0; i < len(this); i++ {
cursor := this[i].Load()
-
- // if len(this) > 1 {
- // fmt.Printf("Producer:: Consumer %d Barrier: %d\n", i+1, cursor)
- // } else {
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Producer Barrier: %d\n", consumer, cursor)
- // }
-
if cursor < minimum {
minimum = cursor
}
diff --git a/main.go b/main.go
index 9d9faf5..f6538a4 100644
--- a/main.go
+++ b/main.go
@@ -17,44 +17,34 @@
consumerBarrier := NewBarrier(consumerSequence1) //, consumerSequence2)
sequencer := NewSingleProducerSequencer(producerSequence, RingSize, consumerBarrier)
- go consume(1, producerBarrier, producerSequence, consumerSequence1)
- // go consume(2, producerBarrier, producerSequence, consumerSequence2)
+ go consume(producerBarrier, producerSequence, consumerSequence1)
+ // go consume(producerBarrier, producerSequence, consumerSequence2)
- // started := time.Now()
+ started := time.Now()
for i := int64(0); i < MaxIterations; i++ {
- // fmt.Printf("Producer:: Attempting to claim next sequence.\n")
ticket := sequencer.Next(1)
- // fmt.Printf("Producer:: Claimed sequence: %d, Assigning slot...\n", ticket)
ringBuffer[ticket&RingMask] = ticket
- // fmt.Printf("Producer:: Claimed sequence: %d, Publishing...\n", ticket)
sequencer.Publish(ticket)
- // fmt.Printf("Producer:: Claimed sequence: %d, Published\n", ticket)
- // if ticket%Mod == 0 && ticket > 0 {
- // finished := time.Now()
- // elapsed := finished.Sub(started)
- // fmt.Println(ticket, elapsed)
- // started = time.Now()
- // }
+ if ticket%Mod == 0 && ticket > 0 {
+ finished := time.Now()
+ elapsed := finished.Sub(started)
+ fmt.Println(ticket, elapsed)
+ started = time.Now()
+ }
}
-
- time.Sleep(time.Nanosecond * 50)
- // fmt.Println("Graceful shutdown\n-------------------------------------------------\n")
}
-func consume(name int, barrier Barrier, source, sequence *Sequence) {
+func consume(barrier Barrier, source, sequence *Sequence) {
worker := NewWorker(barrier, TestHandler{}, source, sequence)
for {
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Attempting to process messages.\n", name)
- if worker.Process(name)+1 > MaxIterations {
- break
- }
+ worker.Process()
}
}
const MaxIterations = MaxSequenceValue
const Mod = 1000000 * 1 // 1 million * N
-const RingSize = 2
+const RingSize = 1024
const RingMask = RingSize - 1
var ringBuffer [RingSize]int64
@@ -63,14 +53,14 @@
func (this TestHandler) Consume(sequence, remaining int64) {
message := ringBuffer[sequence&RingMask]
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Sequence: %d, Message: %d\n", sequence, message)
if message != sequence {
- fmt.Printf("\t\t\t\t\t\t\t\t\tERROR Consumer:: Sequence: %d, Message: %d\n", sequence, message)
- panic(fmt.Sprintf("Consumer:: Sequence: %d, Message: %d\n", sequence, message))
+ text := fmt.Sprintf("[Consumer] ERROR Sequence: %d, Message: %d\n", sequence, message)
+ fmt.Printf(text)
+ panic(text)
}
if sequence%Mod == 0 && sequence > 0 {
- fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer:: Sequence: %d, Message: %d\n", sequence, message)
+ // fmt.Printf("[Consumer] Sequence: %d, Message: %d\n", sequence, message)
}
}
diff --git a/run-arm.sh b/run-arm.sh
index 4e6e254..6e61439 100755
--- a/run-arm.sh
+++ b/run-arm.sh
@@ -6,9 +6,8 @@
case "$1" in
copy)
GOOS=linux GOARCH=arm GOARM=7 go build
- adb push run.sh /data/local/tmp
adb push go-disruptor /data/local/tmp
;;
esac
-adb shell "cd /data/local/tmp; chmod 755 *; ./run.sh"
+adb shell "cd /data/local/tmp; chmod 755 *; ./go-disruptor"
diff --git a/run.sh b/run.sh
deleted file mode 100755
index 8d0372e..0000000
--- a/run.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/system/bin/sh
-
-chmod 755 ./go-disruptor
-while ./go-disruptor 2>/dev/null; do
- # sleep 1
-done
diff --git a/single_sequencer.go b/single_sequencer.go
index f0c8287..a7c8b51 100644
--- a/single_sequencer.go
+++ b/single_sequencer.go
@@ -4,22 +4,14 @@
previous, gate := this.previous, this.gate
next := previous + items
wrap := next - this.ringSize
- // fmt.Printf("Producer:: Last claim: %d, Next: %d, Wrap: %d, Gate:%d\n", previous, next, wrap, gate)
if wrap > gate || gate > previous {
barrier := this.barrier
- min := barrier.Load(1)
- // fmt.Printf("Producer:: (a) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
-
+ min := barrier.Load()
for wrap > min || min < 0 {
- // fmt.Printf("Producer:: (b) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
- min = barrier.Load(1)
- // if wrap <= min {
- // fmt.Printf("Producer:: Consumers have caught up to producer.\n")
- // }
+ min = barrier.Load()
}
- // fmt.Printf("Producer:: (c) Wrap: %d, Current Gate, %d, Proposed Gate:%d\n", wrap, gate, min)
this.gate = min
}
diff --git a/worker.go b/worker.go
index 1e4a51d..c7636bf 100644
--- a/worker.go
+++ b/worker.go
@@ -1,23 +1,15 @@
package main
-func (this Worker) Process(consumer int) int64 {
+func (this Worker) Process() int64 {
next := this.sequence.Load() + 1
- ready := this.barrier.Load(consumer)
+ ready := this.barrier.Load()
if next <= ready {
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: %d work items found (Next %d, Ready %d).\n", consumer, ready-next+1, ready, next)
-
- // if ready > 1000 {
- // fmt.Println("\t\t\t\t\t\t\t\t\tMalformed Next!", next, ready)
- // }
-
for next <= ready {
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Consuming sequence %d\n", consumer, next)
this.handler.Consume(next, ready-next)
next++
}
- // fmt.Printf("\t\t\t\t\t\t\t\t\tConsumer %d:: Completed through sequence %d\n", consumer, next-1)
next--
this.sequence.Store(next)
return next
diff --git a/worker_test.go b/worker_test.go
index ecd3b80..7131fe1 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -15,7 +15,7 @@
producerSequence.Store(1)
for i := int64(0); i < iterations; i++ {
- workerSequence[SequencePayloadIndex] = 0
+ workerSequence.Store(0)
worker.Process()
}
}