fix race condition in reading record and updating record. (#1104)
diff --git a/stats/view/worker.go b/stats/view/worker.go
index 37279b3..2f3c018 100644
--- a/stats/view/worker.go
+++ b/stats/view/worker.go
@@ -236,6 +236,8 @@
}
func (w *worker) reportUsage(now time.Time) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
for _, v := range w.views {
w.reportView(v, now)
}
diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go
index ba6203a..0267e17 100644
--- a/stats/view/worker_commands.go
+++ b/stats/view/worker_commands.go
@@ -121,6 +121,8 @@
}
func (cmd *retrieveDataReq) handleCommand(w *worker) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
vi, ok := w.views[cmd.v]
if !ok {
cmd.c <- &retrieveDataResp{
@@ -153,6 +155,8 @@
}
func (cmd *recordReq) handleCommand(w *worker) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
for _, m := range cmd.ms {
if (m == stats.Measurement{}) { // not registered
continue
diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go
index d430146..8d4546e 100644
--- a/stats/view/worker_test.go
+++ b/stats/view/worker_test.go
@@ -22,6 +22,8 @@
"testing"
"time"
+ "go.opencensus.io/metric/metricdata"
+ "go.opencensus.io/metric/metricexport"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
@@ -397,6 +399,91 @@
}
}
+func TestWorkerRace(t *testing.T) {
+ restart()
+ ctx := context.Background()
+
+ m1 := stats.Int64("measure", "desc", "unit")
+ view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
+ m2 := stats.Int64("measure2", "desc", "unit")
+ view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
+
+ // 1. This will export every microsecond.
+ SetReportingPeriod(time.Microsecond)
+
+ if err := Register(view1, view2); err != nil {
+ t.Fatalf("cannot register: %v", err)
+ }
+
+ e := &countExporter{}
+ RegisterExporter(e)
+
+ // Synchronize and make sure every goroutine has terminated before we exit
+ var waiter sync.WaitGroup
+ waiter.Add(3)
+ defer waiter.Wait()
+
+ doneCh := make(chan bool)
+ // 2. Record write routine at 700ns
+ go func() {
+ defer waiter.Done()
+ tick := time.NewTicker(700 * time.Nanosecond)
+ defer tick.Stop()
+
+ defer func() {
+ close(doneCh)
+ }()
+
+ for i := 0; i < 1e3; i++ {
+ stats.Record(ctx, m1.M(1))
+ stats.Record(ctx, m2.M(1))
+ stats.Record(ctx, m2.M(1))
+ <-tick.C
+ }
+ }()
+
+ // 2. Simulating RetrieveData 900ns
+ go func() {
+ defer waiter.Done()
+ tick := time.NewTicker(900 * time.Nanosecond)
+ defer tick.Stop()
+
+ for {
+ select {
+ case <-doneCh:
+ return
+ case <-tick.C:
+ RetrieveData(view1.Name)
+ }
+ }
+ }()
+
+ // 4. Export via Reader routine at 800ns
+ go func() {
+ defer waiter.Done()
+ tick := time.NewTicker(800 * time.Nanosecond)
+ defer tick.Stop()
+
+ reader := metricexport.Reader{}
+ for {
+ select {
+ case <-doneCh:
+ return
+ case <-tick.C:
+ // Perform some collection here
+ reader.ReadAndExport(&testExporter{})
+ }
+ }
+ }()
+}
+
+type testExporter struct {
+}
+
+func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
+ return nil
+}
+
type countExporter struct {
sync.Mutex
count int64