blob: 6c15d37ef94ab0d166ad25a7d1571d66944dca35 [file] [log] [blame]
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package view
import (
"context"
"errors"
"sync"
"testing"
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
func Test_Worker_ViewRegistration(t *testing.T) {
someError := errors.New("some error")
sc1 := make(chan *Data)
type registration struct {
c chan *Data
vID string
err error
}
type testCase struct {
label string
registrations []registration
}
tcs := []testCase{
{
"register v1ID",
[]registration{
{
sc1,
"v1ID",
nil,
},
},
},
{
"register v1ID+v2ID",
[]registration{
{
sc1,
"v1ID",
nil,
},
},
},
{
"register to v1ID; ??? to v1ID and view with same ID",
[]registration{
{
sc1,
"v1ID",
nil,
},
{
sc1,
"v1SameNameID",
someError,
},
},
},
}
mf1 := stats.Float64("MF1/Test_Worker_ViewSubscription", "desc MF1", "unit")
mf2 := stats.Float64("MF2/Test_Worker_ViewSubscription", "desc MF2", "unit")
for _, tc := range tcs {
t.Run(tc.label, func(t *testing.T) {
restart()
views := map[string]*View{
"v1ID": {
Name: "VF1",
Measure: mf1,
Aggregation: Count(),
},
"v1SameNameID": {
Name: "VF1",
Description: "desc duplicate name VF1",
Measure: mf1,
Aggregation: Sum(),
},
"v2ID": {
Name: "VF2",
Measure: mf2,
Aggregation: Count(),
},
"vNilID": nil,
}
for _, r := range tc.registrations {
v := views[r.vID]
err := Register(v)
if (err != nil) != (r.err != nil) {
t.Errorf("%v: Register() = %v, want %v", tc.label, err, r.err)
}
}
})
}
}
func Test_Worker_RecordFloat64(t *testing.T) {
restart()
someError := errors.New("some error")
m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
k1 := tag.MustNewKey("k1")
k2 := tag.MustNewKey("k2")
ctx, err := tag.New(context.Background(),
tag.Insert(k1, "v1"),
tag.Insert(k2, "v2"),
)
if err != nil {
t.Fatal(err)
}
v1 := &View{"VF1", "desc VF1", []tag.Key{k1, k2}, m, Count()}
v2 := &View{"VF2", "desc VF2", []tag.Key{k1, k2}, m, Count()}
type want struct {
v *View
rows []*Row
err error
}
type testCase struct {
label string
registrations []*View
records []float64
wants []want
}
tcs := []testCase{
{
label: "0",
registrations: []*View{},
records: []float64{1, 1},
wants: []want{{v1, nil, someError}, {v2, nil, someError}},
},
{
label: "1",
registrations: []*View{v1},
records: []float64{1, 1},
wants: []want{
{
v1,
[]*Row{
{
[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
&CountData{Value: 2},
},
},
nil,
},
{v2, nil, someError},
},
},
{
label: "2",
registrations: []*View{v1, v2},
records: []float64{1, 1},
wants: []want{
{
v1,
[]*Row{
{
[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
&CountData{Value: 2},
},
},
nil,
},
{
v2,
[]*Row{
{
[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
&CountData{Value: 2},
},
},
nil,
},
},
},
}
for _, tc := range tcs {
for _, v := range tc.registrations {
if err := Register(v); err != nil {
t.Fatalf("%v: Register(%v) = %v; want no errors", tc.label, v.Name, err)
}
}
for _, value := range tc.records {
stats.Record(ctx, m.M(value))
}
for _, w := range tc.wants {
gotRows, err := RetrieveData(w.v.Name)
if (err != nil) != (w.err != nil) {
t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err)
}
for _, got := range gotRows {
if !containsRow(w.rows, got) {
t.Errorf("%s: got row %#v; want none", tc.label, got)
break
}
}
for _, want := range w.rows {
if !containsRow(gotRows, want) {
t.Errorf("%s: got none; want %#v'", tc.label, want)
break
}
}
}
// Cleaning up.
Unregister(tc.registrations...)
}
}
func TestReportUsage(t *testing.T) {
ctx := context.Background()
m := stats.Int64("measure", "desc", "unit")
tests := []struct {
name string
view *View
wantMaxCount int64
}{
{
name: "cum",
view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
wantMaxCount: 8,
},
{
name: "cum2",
view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
wantMaxCount: 8,
},
}
for _, tt := range tests {
restart()
SetReportingPeriod(25 * time.Millisecond)
if err := Register(tt.view); err != nil {
t.Fatalf("%v: cannot register: %v", tt.name, err)
}
e := &countExporter{}
RegisterExporter(e)
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
time.Sleep(50 * time.Millisecond)
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
time.Sleep(50 * time.Millisecond)
e.Lock()
count := e.count
e.Unlock()
if got, want := count, tt.wantMaxCount; got > want {
t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
}
}
}
func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
t.Parallel()
worker := newWorker()
durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond}
for i, duration := range durations {
ackChan := make(chan bool, 1)
cmd := &setReportingPeriodReq{c: ackChan, d: duration}
cmd.handleCommand(worker)
select {
case <-ackChan:
case <-time.After(500 * time.Millisecond): // Arbitrarily using 500ms as the timeout duration.
t.Errorf("#%d: duration %v blocks", i, duration)
}
}
}
func TestWorkerStarttime(t *testing.T) {
restart()
ctx := context.Background()
m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
v := &View{
Name: "testview",
Measure: m,
Aggregation: Count(),
}
SetReportingPeriod(25 * time.Millisecond)
if err := Register(v); err != nil {
t.Fatalf("cannot register to %v: %v", v.Name, err)
}
e := &vdExporter{}
RegisterExporter(e)
defer UnregisterExporter(e)
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
time.Sleep(50 * time.Millisecond)
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
time.Sleep(50 * time.Millisecond)
e.Lock()
if len(e.vds) == 0 {
t.Fatal("Got no view data; want at least one")
}
var start time.Time
for _, vd := range e.vds {
if start.IsZero() {
start = vd.Start
}
if !vd.Start.Equal(start) {
t.Errorf("Cumulative view data start time = %v; want %v", vd.Start, start)
}
}
e.Unlock()
}
func TestUnregisterReportsUsage(t *testing.T) {
restart()
ctx := context.Background()
m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
SetReportingPeriod(time.Hour)
if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}
e := &countExporter{}
RegisterExporter(e)
stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))
Unregister(view2)
// Unregister should only flush view2, so expect the count of 2.
want := int64(2)
e.Lock()
got := e.totalCount
e.Unlock()
if got != want {
t.Errorf("got count data = %v; want %v", got, want)
}
}
func TestWorkerRace(t *testing.T) {
restart()
ctx := context.Background()
m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
// 1. This will export every microsecond.
SetReportingPeriod(time.Microsecond)
if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}
e := &countExporter{}
RegisterExporter(e)
// Synchronize and make sure every goroutine has terminated before we exit
var waiter sync.WaitGroup
waiter.Add(3)
defer waiter.Wait()
doneCh := make(chan bool)
// 2. Record write routine at 700ns
go func() {
defer waiter.Done()
tick := time.NewTicker(700 * time.Nanosecond)
defer tick.Stop()
defer func() {
close(doneCh)
}()
for i := 0; i < 1e3; i++ {
stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(1))
stats.Record(ctx, m2.M(1))
<-tick.C
}
}()
// 2. Simulating RetrieveData 900ns
go func() {
defer waiter.Done()
tick := time.NewTicker(900 * time.Nanosecond)
defer tick.Stop()
for {
select {
case <-doneCh:
return
case <-tick.C:
RetrieveData(view1.Name)
}
}
}()
// 4. Export via Reader routine at 800ns
go func() {
defer waiter.Done()
tick := time.NewTicker(800 * time.Nanosecond)
defer tick.Stop()
reader := metricexport.Reader{}
for {
select {
case <-doneCh:
return
case <-tick.C:
// Perform some collection here
reader.ReadAndExport(&testExporter{})
}
}
}()
}
type testExporter struct {
}
func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
return nil
}
type countExporter struct {
sync.Mutex
count int64
totalCount int64
}
func (e *countExporter) ExportView(vd *Data) {
if len(vd.Rows) == 0 {
return
}
d := vd.Rows[0].Data.(*CountData)
e.Lock()
defer e.Unlock()
e.count = d.Value
e.totalCount += d.Value
}
type vdExporter struct {
sync.Mutex
vds []*Data
}
func (e *vdExporter) ExportView(vd *Data) {
e.Lock()
defer e.Unlock()
e.vds = append(e.vds, vd)
}
// restart stops the current processors and creates a new one.
func restart() {
defaultWorker.stop()
defaultWorker = newWorker()
go defaultWorker.start()
}