| // Copyright 2017, OpenCensus Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| package view |
| |
| import ( |
| "context" |
| "errors" |
| "sort" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "go.opencensus.io/resource" |
| |
| "go.opencensus.io/metric/metricdata" |
| "go.opencensus.io/metric/metricexport" |
| "go.opencensus.io/stats" |
| "go.opencensus.io/tag" |
| ) |
| |
| func Test_Worker_ViewRegistration(t *testing.T) { |
| someError := errors.New("some error") |
| |
| sc1 := make(chan *Data) |
| |
| type registration struct { |
| c chan *Data |
| vID string |
| err error |
| } |
| type testCase struct { |
| label string |
| registrations []registration |
| } |
| tcs := []testCase{ |
| { |
| "register v1ID", |
| []registration{ |
| { |
| sc1, |
| "v1ID", |
| nil, |
| }, |
| }, |
| }, |
| { |
| "register v1ID+v2ID", |
| []registration{ |
| { |
| sc1, |
| "v1ID", |
| nil, |
| }, |
| }, |
| }, |
| { |
| "register to v1ID; ??? to v1ID and view with same ID", |
| []registration{ |
| { |
| sc1, |
| "v1ID", |
| nil, |
| }, |
| { |
| sc1, |
| "v1SameNameID", |
| someError, |
| }, |
| }, |
| }, |
| } |
| |
| mf1 := stats.Float64("MF1/Test_Worker_ViewSubscription", "desc MF1", "unit") |
| mf2 := stats.Float64("MF2/Test_Worker_ViewSubscription", "desc MF2", "unit") |
| |
| for _, tc := range tcs { |
| t.Run(tc.label, func(t *testing.T) { |
| restart() |
| |
| views := map[string]*View{ |
| "v1ID": { |
| Name: "VF1", |
| Measure: mf1, |
| Aggregation: Count(), |
| }, |
| "v1SameNameID": { |
| Name: "VF1", |
| Description: "desc duplicate name VF1", |
| Measure: mf1, |
| Aggregation: Sum(), |
| }, |
| "v2ID": { |
| Name: "VF2", |
| Measure: mf2, |
| Aggregation: Count(), |
| }, |
| "vNilID": nil, |
| } |
| |
| for _, r := range tc.registrations { |
| v := views[r.vID] |
| err := Register(v) |
| if (err != nil) != (r.err != nil) { |
| t.Errorf("%v: Register() = %v, want %v", tc.label, err, r.err) |
| } |
| } |
| }) |
| } |
| } |
| |
| func Test_Worker_MultiExport(t *testing.T) { |
| restart() |
| |
| // This test reports the same data for the default worker and a secondary |
| // worker, and ensures that the stats are kept independently. |
| extraResource := resource.Resource{ |
| Type: "additional", |
| Labels: map[string]string{"key1": "value1", "key2": "value2"}, |
| } |
| worker2 := NewMeter().(*worker) |
| worker2.Start() |
| worker2.SetResource(&extraResource) |
| |
| m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit") |
| key := tag.MustNewKey(("key")) |
| count := &View{"VF1", "description", []tag.Key{key}, m, Count()} |
| sum := &View{"VF2", "description", []tag.Key{}, m, Sum()} |
| |
| Register(count, sum) |
| worker2.Register(count) // Don't compute the sum for worker2, to verify independence of computation. |
| data := []struct { |
| w Meter |
| tags string // Tag values |
| value float64 |
| }{{ |
| tags: "a", |
| value: 2.0, |
| }, { |
| tags: "b", |
| value: 3.0, |
| }, { |
| tags: "a", value: 2.5, |
| }, { |
| w: worker2, tags: "b", value: 1.0, |
| }, |
| } |
| |
| for _, d := range data { |
| ctx, err := tag.New(context.Background(), tag.Upsert(key, d.tags)) |
| if err != nil { |
| t.Fatalf("%s: failed to add tag %q: %v", d.w, key.Name(), err) |
| } |
| if d.w != nil { |
| d.w.Record(tag.FromContext(ctx), []stats.Measurement{m.M(d.value)}, nil) |
| } else { |
| stats.Record(ctx, m.M(d.value)) |
| } |
| } |
| |
| makeKey := func(r *resource.Resource, view string) string { |
| if r == nil { |
| r = &resource.Resource{} |
| } |
| return resource.EncodeLabels(r.Labels) + "/" + view |
| } |
| |
| // Format is Resource.Labels encoded as string, then |
| wantPartialData := map[string][]*Row{ |
| makeKey(nil, count.Name): []*Row{ |
| {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}}, |
| {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, |
| }, |
| makeKey(nil, sum.Name): []*Row{ |
| {nil, &SumData{Value: 7.5}}, |
| }, |
| makeKey(&extraResource, count.Name): []*Row{ |
| {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, |
| }, |
| } |
| |
| te := &testExporter{} |
| metricexport.NewReader().ReadAndExport(te) |
| for _, m := range te.metrics { |
| key := makeKey(m.Resource, m.Descriptor.Name) |
| want, ok := wantPartialData[key] |
| if !ok { |
| t.Errorf("Unexpected data for %q: %v", key, m) |
| continue |
| } |
| gotTs := m.TimeSeries |
| sort.Sort(byLabel(gotTs)) |
| |
| for i, ts := range gotTs { |
| for j, label := range ts.LabelValues { |
| if want[i].Tags[j].Value != label.Value { |
| t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key) |
| } |
| } |
| switch wantValue := want[i].Data.(type) { |
| case *CountData: |
| got := ts.Points[0].Value.(int64) |
| if wantValue.Value != got { |
| t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue.Value, got, ts, key) |
| } |
| case *SumData: |
| got := ts.Points[0].Value.(float64) |
| if wantValue.Value != got { |
| t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue.Value, got, ts, key) |
| } |
| default: |
| t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key) |
| } |
| } |
| } |
| |
| // Verify that worker has not been computing sum: |
| got, err := worker2.RetrieveData(sum.Name) |
| if err == nil { |
| t.Errorf("%s: expected no data because it was not registered, got %#v", sum.Name, got) |
| } |
| |
| Unregister(count, sum) |
| worker2.Unregister(count) |
| worker2.Stop() |
| } |
| |
| func Test_Worker_RecordFloat64(t *testing.T) { |
| restart() |
| |
| someError := errors.New("some error") |
| m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit") |
| |
| k1 := tag.MustNewKey("k1") |
| k2 := tag.MustNewKey("k2") |
| ctx, err := tag.New(context.Background(), |
| tag.Insert(k1, "v1"), |
| tag.Insert(k2, "v2"), |
| ) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| v1 := &View{"VF1", "desc VF1", []tag.Key{k1, k2}, m, Count()} |
| v2 := &View{"VF2", "desc VF2", []tag.Key{k1, k2}, m, Count()} |
| |
| type want struct { |
| v *View |
| rows []*Row |
| err error |
| } |
| type testCase struct { |
| label string |
| registrations []*View |
| records []float64 |
| wants []want |
| } |
| |
| tcs := []testCase{ |
| { |
| label: "0", |
| registrations: []*View{}, |
| records: []float64{1, 1}, |
| wants: []want{{v1, nil, someError}, {v2, nil, someError}}, |
| }, |
| { |
| label: "1", |
| registrations: []*View{v1}, |
| records: []float64{1, 1}, |
| wants: []want{ |
| { |
| v1, |
| []*Row{ |
| { |
| []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}}, |
| &CountData{Value: 2}, |
| }, |
| }, |
| nil, |
| }, |
| {v2, nil, someError}, |
| }, |
| }, |
| { |
| label: "2", |
| registrations: []*View{v1, v2}, |
| records: []float64{1, 1}, |
| wants: []want{ |
| { |
| v1, |
| []*Row{ |
| { |
| []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}}, |
| &CountData{Value: 2}, |
| }, |
| }, |
| nil, |
| }, |
| { |
| v2, |
| []*Row{ |
| { |
| []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}}, |
| &CountData{Value: 2}, |
| }, |
| }, |
| nil, |
| }, |
| }, |
| }, |
| } |
| |
| for _, tc := range tcs { |
| for _, v := range tc.registrations { |
| if err := Register(v); err != nil { |
| t.Fatalf("%v: Register(%v) = %v; want no errors", tc.label, v.Name, err) |
| } |
| } |
| |
| for _, value := range tc.records { |
| stats.Record(ctx, m.M(value)) |
| } |
| |
| for _, w := range tc.wants { |
| gotRows, err := RetrieveData(w.v.Name) |
| for i := range gotRows { |
| switch data := gotRows[i].Data.(type) { |
| case *CountData: |
| data.Start = time.Time{} |
| case *SumData: |
| data.Start = time.Time{} |
| case *DistributionData: |
| data.Start = time.Time{} |
| } |
| } |
| if (err != nil) != (w.err != nil) { |
| t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err) |
| } |
| if diff := cmp.Diff(gotRows, w.rows); diff != "" { |
| t.Errorf("%v: unexpected row (got-, want+): %s", tc.label, diff) |
| break |
| } |
| } |
| |
| // Cleaning up. |
| Unregister(tc.registrations...) |
| } |
| } |
| |
| func TestReportUsage(t *testing.T) { |
| ctx := context.Background() |
| |
| m := stats.Int64("measure", "desc", "unit") |
| |
| tests := []struct { |
| name string |
| view *View |
| wantMaxCount int64 |
| }{ |
| { |
| name: "cum", |
| view: &View{Name: "cum1", Measure: m, Aggregation: Count()}, |
| wantMaxCount: 8, |
| }, |
| { |
| name: "cum2", |
| view: &View{Name: "cum1", Measure: m, Aggregation: Count()}, |
| wantMaxCount: 8, |
| }, |
| } |
| |
| for _, tt := range tests { |
| restart() |
| SetReportingPeriod(25 * time.Millisecond) |
| |
| if err := Register(tt.view); err != nil { |
| t.Fatalf("%v: cannot register: %v", tt.name, err) |
| } |
| |
| e := &countExporter{} |
| RegisterExporter(e) |
| |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| |
| time.Sleep(50 * time.Millisecond) |
| |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| |
| time.Sleep(50 * time.Millisecond) |
| |
| e.Lock() |
| count := e.count |
| e.Unlock() |
| if got, want := count, tt.wantMaxCount; got > want { |
| t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want) |
| } |
| } |
| |
| } |
| |
| func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) { |
| t.Parallel() |
| |
| worker := NewMeter().(*worker) |
| durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond} |
| for i, duration := range durations { |
| ackChan := make(chan bool, 1) |
| cmd := &setReportingPeriodReq{c: ackChan, d: duration} |
| cmd.handleCommand(worker) |
| |
| select { |
| case <-ackChan: |
| case <-time.After(500 * time.Millisecond): // Arbitrarily using 500ms as the timeout duration. |
| t.Errorf("#%d: duration %v blocks", i, duration) |
| } |
| } |
| } |
| |
| func TestWorkerStarttime(t *testing.T) { |
| restart() |
| |
| ctx := context.Background() |
| m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit") |
| v := &View{ |
| Name: "testview", |
| Measure: m, |
| Aggregation: Count(), |
| } |
| |
| SetReportingPeriod(25 * time.Millisecond) |
| if err := Register(v); err != nil { |
| t.Fatalf("cannot register to %v: %v", v.Name, err) |
| } |
| |
| e := &vdExporter{} |
| RegisterExporter(e) |
| defer UnregisterExporter(e) |
| |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| |
| time.Sleep(50 * time.Millisecond) |
| |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| stats.Record(ctx, m.M(1)) |
| |
| time.Sleep(50 * time.Millisecond) |
| |
| e.Lock() |
| if len(e.vds) == 0 { |
| t.Fatal("Got no view data; want at least one") |
| } |
| |
| var start time.Time |
| for _, vd := range e.vds { |
| if start.IsZero() { |
| start = vd.Start |
| } |
| if !vd.Start.Equal(start) { |
| t.Errorf("Cumulative view data start time = %v; want %v", vd.Start, start) |
| } |
| } |
| e.Unlock() |
| } |
| |
| func TestUnregisterReportsUsage(t *testing.T) { |
| restart() |
| ctx := context.Background() |
| |
| m1 := stats.Int64("measure", "desc", "unit") |
| view1 := &View{Name: "count", Measure: m1, Aggregation: Count()} |
| m2 := stats.Int64("measure2", "desc", "unit") |
| view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()} |
| |
| SetReportingPeriod(time.Hour) |
| |
| if err := Register(view1, view2); err != nil { |
| t.Fatalf("cannot register: %v", err) |
| } |
| |
| e := &countExporter{} |
| RegisterExporter(e) |
| |
| stats.Record(ctx, m1.M(1)) |
| stats.Record(ctx, m2.M(1)) |
| stats.Record(ctx, m2.M(1)) |
| |
| Unregister(view2) |
| |
| // Unregister should only flush view2, so expect the count of 2. |
| want := int64(2) |
| |
| e.Lock() |
| got := e.totalCount |
| e.Unlock() |
| if got != want { |
| t.Errorf("got count data = %v; want %v", got, want) |
| } |
| } |
| |
| func TestWorkerRace(t *testing.T) { |
| restart() |
| ctx := context.Background() |
| |
| m1 := stats.Int64("measure", "desc", "unit") |
| view1 := &View{Name: "count", Measure: m1, Aggregation: Count()} |
| m2 := stats.Int64("measure2", "desc", "unit") |
| view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()} |
| |
| // 1. This will export every microsecond. |
| SetReportingPeriod(time.Microsecond) |
| |
| if err := Register(view1, view2); err != nil { |
| t.Fatalf("cannot register: %v", err) |
| } |
| |
| e := &countExporter{} |
| RegisterExporter(e) |
| |
| // Synchronize and make sure every goroutine has terminated before we exit |
| var waiter sync.WaitGroup |
| waiter.Add(3) |
| defer waiter.Wait() |
| |
| doneCh := make(chan bool) |
| // 2. Record write routine at 700ns |
| go func() { |
| defer waiter.Done() |
| tick := time.NewTicker(700 * time.Nanosecond) |
| defer tick.Stop() |
| |
| defer func() { |
| close(doneCh) |
| }() |
| |
| for i := 0; i < 1e3; i++ { |
| stats.Record(ctx, m1.M(1)) |
| stats.Record(ctx, m2.M(1)) |
| stats.Record(ctx, m2.M(1)) |
| <-tick.C |
| } |
| }() |
| |
| // 2. Simulating RetrieveData 900ns |
| go func() { |
| defer waiter.Done() |
| tick := time.NewTicker(900 * time.Nanosecond) |
| defer tick.Stop() |
| |
| for { |
| select { |
| case <-doneCh: |
| return |
| case <-tick.C: |
| RetrieveData(view1.Name) |
| } |
| } |
| }() |
| |
| // 4. Export via Reader routine at 800ns |
| go func() { |
| defer waiter.Done() |
| tick := time.NewTicker(800 * time.Nanosecond) |
| defer tick.Stop() |
| |
| reader := metricexport.Reader{} |
| for { |
| select { |
| case <-doneCh: |
| return |
| case <-tick.C: |
| // Perform some collection here |
| reader.ReadAndExport(&testExporter{}) |
| } |
| } |
| }() |
| } |
| |
| type testExporter struct { |
| metrics []*metricdata.Metric |
| } |
| |
| func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { |
| te.metrics = metrics |
| return nil |
| } |
| |
| type countExporter struct { |
| sync.Mutex |
| count int64 |
| totalCount int64 |
| } |
| |
| func (e *countExporter) ExportView(vd *Data) { |
| if len(vd.Rows) == 0 { |
| return |
| } |
| d := vd.Rows[0].Data.(*CountData) |
| |
| e.Lock() |
| defer e.Unlock() |
| e.count = d.Value |
| e.totalCount += d.Value |
| } |
| |
| type vdExporter struct { |
| sync.Mutex |
| vds []*Data |
| } |
| |
| func (e *vdExporter) ExportView(vd *Data) { |
| e.Lock() |
| defer e.Unlock() |
| |
| e.vds = append(e.vds, vd) |
| } |
| |
| // restart stops the current processors and creates a new one. |
| func restart() { |
| defaultWorker.Stop() |
| defaultWorker = NewMeter().(*worker) |
| go defaultWorker.start() |
| } |
| |
| // byTag implements sort.Interface for *metricdata.TimeSeries by Labels. |
| type byLabel []*metricdata.TimeSeries |
| |
| func (ts byLabel) Len() int { return len(ts) } |
| func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } |
| func (ts byLabel) Less(i, j int) bool { |
| if len(ts[i].LabelValues) != len(ts[j].LabelValues) { |
| return len(ts[i].LabelValues) < len(ts[j].LabelValues) |
| } |
| for k := range ts[i].LabelValues { |
| if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value { |
| return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value |
| } |
| } |
| return false |
| } |