blob: 61c915b7f34c2b1e69f630c36af20ad3138c2578 [file] [log] [blame]
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package metricexport
import (
"context"
"sync"
"testing"
"time"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
)
var (
ir1 *IntervalReader
ir2 *IntervalReader
reader1 = NewReader(WithSpanName("test-export-span"))
exporter1 = &metricExporter{}
exporter2 = &metricExporter{}
gaugeEntry *metric.Int64GaugeEntry
duration1 = 1000 * time.Millisecond
duration2 = 2000 * time.Millisecond
)
type metricExporter struct {
sync.Mutex
metrics []*metricdata.Metric
}
func (e *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
e.Lock()
defer e.Unlock()
e.metrics = append(e.metrics, metrics...)
return nil
}
func init() {
r := metric.NewRegistry()
metricproducer.GlobalManager().AddProducer(r)
g, _ := r.AddInt64Gauge("active_request",
metric.WithDescription("Number of active requests, per method."),
metric.WithUnit(metricdata.UnitDimensionless),
metric.WithLabelKeys("method"))
gaugeEntry, _ = g.GetEntry(metricdata.NewLabelValue("foo"))
}
func TestNewReaderWitDefaultOptions(t *testing.T) {
r := NewReader()
if r.spanName != defaultSpanName {
t.Errorf("span name: got %v, want %v\n", r.spanName, defaultSpanName)
}
}
func TestNewReaderWitSpanName(t *testing.T) {
spanName := "test-span"
r := NewReader(WithSpanName(spanName))
if r.spanName != spanName {
t.Errorf("span name: got %+v, want %v\n", r.spanName, spanName)
}
}
func TestNewReader(t *testing.T) {
r := NewReader()
gaugeEntry.Add(1)
r.ReadAndExport(exporter1)
checkExportedCount(exporter1, 1, t)
checkExportedMetricDesc(exporter1, "active_request", t)
resetExporter(exporter1)
}
func TestNewIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
gaugeEntry.Add(1)
time.Sleep(1500 * time.Millisecond)
checkExportedCount(exporter1, 1, t)
checkExportedMetricDesc(exporter1, "active_request", t)
ir1.Stop()
resetExporter(exporter1)
}
func TestManualReadForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
gaugeEntry.Set(1)
reader1.ReadAndExport(exporter1)
gaugeEntry.Set(4)
time.Sleep(1500 * time.Millisecond)
checkExportedCount(exporter1, 2, t)
checkExportedValues(exporter1, []int64{1, 4}, t) // one for manual read other for time based.
checkExportedMetricDesc(exporter1, "active_request", t)
ir1.Stop()
resetExporter(exporter1)
}
func TestFlushNoOpForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
gaugeEntry.Set(1)
// since IR is not stopped, flush does nothing
ir1.Flush()
// expect no data points
checkExportedCount(exporter1, 0, t)
checkExportedMetricDesc(exporter1, "active_request", t)
ir1.Stop()
resetExporter(exporter1)
}
func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
gaugeEntry.Set(1)
ir1.Stop()
ir1.Flush()
// metric is still coming in
gaugeEntry.Add(1)
// one more flush after IR stopped
ir1.Flush()
// expect 2 data point, one from each flush
checkExportedCount(exporter1, 2, t)
checkExportedValues(exporter1, []int64{1, 2}, t)
checkExportedMetricDesc(exporter1, "active_request", t)
resetExporter(exporter1)
}
func TestFlushRestartForIntervalReader(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
gaugeEntry.Set(1)
ir1.Stop()
ir1.Flush()
// restart the IR
err := ir1.Start()
if err != nil {
t.Fatalf("error starting reader %v\n", err)
}
gaugeEntry.Add(1)
ir1.Stop()
ir1.Flush()
// expect 2 data point, one from each flush
checkExportedCount(exporter1, 2, t)
checkExportedValues(exporter1, []int64{1, 2}, t)
checkExportedMetricDesc(exporter1, "active_request", t)
resetExporter(exporter1)
}
func TestProducerWithIntervalReaderStop(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
ir1.Stop()
gaugeEntry.Add(1)
time.Sleep(1500 * time.Millisecond)
checkExportedCount(exporter1, 0, t)
checkExportedMetricDesc(exporter1, "active_request", t)
resetExporter(exporter1)
}
func TestProducerWithMultipleIntervalReaders(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
ir2 = createAndStart(exporter2, duration2, t)
gaugeEntry.Add(1)
time.Sleep(2500 * time.Millisecond)
checkExportedCount(exporter1, 2, t)
checkExportedMetricDesc(exporter1, "active_request", t)
checkExportedCount(exporter2, 1, t)
checkExportedMetricDesc(exporter2, "active_request", t)
ir1.Stop()
ir2.Stop()
resetExporter(exporter1)
resetExporter(exporter1)
}
func TestIntervalReaderMultipleStop(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
stop := make(chan bool, 1)
go func() {
ir1.Stop()
ir1.Stop()
stop <- true
}()
select {
case _ = <-stop:
case <-time.After(1 * time.Second):
t.Fatalf("ir1 stop got blocked")
}
}
func TestIntervalReaderMultipleStart(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
err := ir1.Start()
if err == nil {
t.Fatalf("expected error but got nil\n")
}
gaugeEntry.Add(1)
time.Sleep(1500 * time.Millisecond)
checkExportedCount(exporter1, 1, t)
checkExportedMetricDesc(exporter1, "active_request", t)
ir1.Stop()
resetExporter(exporter1)
}
func TestNewIntervalReaderWithNilReader(t *testing.T) {
_, err := NewIntervalReader(nil, exporter1)
if err == nil {
t.Fatalf("expected error but got nil\n")
}
}
func TestNewIntervalReaderWithNilExporter(t *testing.T) {
_, err := NewIntervalReader(reader1, nil)
if err == nil {
t.Fatalf("expected error but got nil\n")
}
}
func TestNewIntervalReaderStartWithInvalidInterval(t *testing.T) {
ir, err := NewIntervalReader(reader1, exporter1)
ir.ReportingInterval = 500 * time.Millisecond
err = ir.Start()
if err == nil {
t.Fatalf("expected error but got nil\n")
}
}
func checkExportedCount(exporter *metricExporter, wantCount int, t *testing.T) {
exporter.Lock()
defer exporter.Unlock()
gotCount := len(exporter.metrics)
if gotCount != wantCount {
t.Fatalf("exported metric count: got %d, want %d\n", gotCount, wantCount)
}
}
func checkExportedValues(exporter *metricExporter, wantValues []int64, t *testing.T) {
exporter.Lock()
defer exporter.Unlock()
gotCount := len(exporter.metrics)
wantCount := len(wantValues)
if gotCount != wantCount {
t.Errorf("exported metric count: got %d, want %d\n", gotCount, wantCount)
return
}
for i, wantValue := range wantValues {
var gotValue int64
switch v := exporter.metrics[i].TimeSeries[0].Points[0].Value.(type) {
case int64:
gotValue = v
default:
t.Errorf("expected float64 value but found other %T", exporter.metrics[i].TimeSeries[0].Points[0].Value)
}
if gotValue != wantValue {
t.Errorf("values idx %d, got: %v, want %v", i, gotValue, wantValue)
}
}
}
func checkExportedMetricDesc(exporter *metricExporter, wantMdName string, t *testing.T) {
exporter.Lock()
defer exporter.Unlock()
for _, metric := range exporter.metrics {
gotMdName := metric.Descriptor.Name
if gotMdName != wantMdName {
t.Errorf("got %s, want %s\n", gotMdName, wantMdName)
}
}
exporter.metrics = nil
}
func resetExporter(exporter *metricExporter) {
exporter.Lock()
defer exporter.Unlock()
exporter.metrics = nil
}
// createAndStart stops the current processors and creates a new one.
func createAndStart(exporter *metricExporter, d time.Duration, t *testing.T) *IntervalReader {
ir, _ := NewIntervalReader(reader1, exporter)
ir.ReportingInterval = d
err := ir.Start()
if err != nil {
t.Fatalf("error creating reader %v\n", err)
}
return ir
}