Exemplar: Use generic interface for attachment values. (#1070)
diff --git a/metric/metricdata/exemplar.go b/metric/metricdata/exemplar.go
index f51e33f..cdbeef0 100644
--- a/metric/metricdata/exemplar.go
+++ b/metric/metricdata/exemplar.go
@@ -30,4 +30,4 @@
}
// Attachments is a map of extra values associated with a recorded data point.
-type Attachments map[string]string
+type Attachments map[string]interface{}
diff --git a/stats/internal/record.go b/stats/internal/record.go
index ed54552..36935e6 100644
--- a/stats/internal/record.go
+++ b/stats/internal/record.go
@@ -19,7 +19,7 @@
)
// DefaultRecorder will be called for each Record call.
-var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]string)
+var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]interface{})
// SubscriptionReporter reports when a view subscribed with a measure.
var SubscriptionReporter func(measure string)
diff --git a/stats/record.go b/stats/record.go
index a0f0ec4..d2af0a6 100644
--- a/stats/record.go
+++ b/stats/record.go
@@ -51,7 +51,7 @@
return
}
// TODO(songy23): fix attachments.
- recorder(tag.FromContext(ctx), ms, map[string]string{})
+ recorder(tag.FromContext(ctx), ms, map[string]interface{}{})
}
// RecordWithTags records one or multiple measurements at once.
diff --git a/stats/view/aggregation_data.go b/stats/view/aggregation_data.go
index bf8015f..8774a07 100644
--- a/stats/view/aggregation_data.go
+++ b/stats/view/aggregation_data.go
@@ -17,6 +17,7 @@
import (
"math"
+ "time"
"go.opencensus.io/metric/metricdata"
)
@@ -26,7 +27,7 @@
// Mosts users won't directly access aggregration data.
type AggregationData interface {
isAggregationData() bool
- addSample(v float64)
+ addSample(v float64, attachments map[string]interface{}, t time.Time)
clone() AggregationData
equal(other AggregationData) bool
}
@@ -43,7 +44,7 @@
func (a *CountData) isAggregationData() bool { return true }
-func (a *CountData) addSample(_ float64) {
+func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
a.Value = a.Value + 1
}
@@ -70,7 +71,7 @@
func (a *SumData) isAggregationData() bool { return true }
-func (a *SumData) addSample(v float64) {
+func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
a.Value += v
}
@@ -101,7 +102,7 @@
SumOfSquaredDev float64 // sum of the squared deviation from the mean
CountPerBucket []int64 // number of occurrences per bucket
// ExemplarsPerBucket is slice the same length as CountPerBucket containing
- // an metricdata for the associated bucket, or nil.
+ // an exemplar for the associated bucket, or nil.
ExemplarsPerBucket []*metricdata.Exemplar
bounds []float64 // histogram distribution of the values
}
@@ -130,7 +131,7 @@
func (a *DistributionData) isAggregationData() bool { return true }
// TODO(songy23): support exemplar attachments.
-func (a *DistributionData) addSample(v float64) {
+func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
if v < a.Min {
a.Min = v
}
@@ -138,7 +139,7 @@
a.Max = v
}
a.Count++
- a.addToBucket(v)
+ a.addToBucket(v, attachments, t)
if a.Count == 1 {
a.Mean = v
@@ -150,18 +151,35 @@
a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
}
-func (a *DistributionData) addToBucket(v float64) {
+func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
var count *int64
- for i, b := range a.bounds {
+ var i int
+ var b float64
+ for i, b = range a.bounds {
if v < b {
count = &a.CountPerBucket[i]
break
}
}
if count == nil { // Last bucket.
- count = &a.CountPerBucket[len(a.bounds)]
+ i = len(a.bounds)
+ count = &a.CountPerBucket[i]
}
*count++
+ if exemplar := getExemplar(v, attachments, t); exemplar != nil {
+ a.ExemplarsPerBucket[i] = exemplar
+ }
+}
+
+func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
+ if len(attachments) == 0 {
+ return nil
+ }
+ return &metricdata.Exemplar{
+ Value: v,
+ Timestamp: t,
+ Attachments: attachments,
+ }
}
func (a *DistributionData) clone() AggregationData {
@@ -199,7 +217,7 @@
return true
}
-func (l *LastValueData) addSample(v float64) {
+func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
l.Value = v
}
diff --git a/stats/view/aggregation_data_test.go b/stats/view/aggregation_data_test.go
index caa624c..a7e0567 100644
--- a/stats/view/aggregation_data_test.go
+++ b/stats/view/aggregation_data_test.go
@@ -18,6 +18,7 @@
import (
"reflect"
"testing"
+ "time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
@@ -66,12 +67,15 @@
func TestDistributionData_addSample(t *testing.T) {
dd := newDistributionData([]float64{1, 2})
- dd.addSample(0.5)
+ attachments1 := map[string]interface{}{"key1": "value1"}
+ t1 := time.Now()
+ dd.addSample(0.5, attachments1, t1)
+ e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: attachments1}
want := &DistributionData{
Count: 1,
CountPerBucket: []int64{1, 0, 0},
- ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
+ ExemplarsPerBucket: []*metricdata.Exemplar{e1, nil, nil},
Max: 0.5,
Min: 0.5,
Mean: 0.5,
@@ -81,13 +85,16 @@
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}
- dd.addSample(0.7)
+ attachments2 := map[string]interface{}{"key2": "value2"}
+ t2 := t1.Add(time.Microsecond)
+ dd.addSample(0.7, attachments2, t2)
- // Previous exemplar should be preserved, since it has more annotations.
+ // Previous exemplar should be overwritten.
+ e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: attachments2}
want = &DistributionData{
Count: 2,
CountPerBucket: []int64{2, 0, 0},
- ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
+ ExemplarsPerBucket: []*metricdata.Exemplar{e2, nil, nil},
Max: 0.7,
Min: 0.5,
Mean: 0.6,
@@ -97,16 +104,19 @@
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}
- dd.addSample(0.2)
+ attachments3 := map[string]interface{}{"key3": "value3"}
+ t3 := t2.Add(time.Microsecond)
+ dd.addSample(1.2, attachments3, t3)
- // Exemplar should be replaced since it has a trace_id.
+ // e3 is at another bucket. e2 should still be there.
+ e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: attachments3}
want = &DistributionData{
Count: 3,
- CountPerBucket: []int64{3, 0, 0},
- ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
- Max: 0.7,
- Min: 0.2,
- Mean: 0.4666666666666667,
+ CountPerBucket: []int64{2, 1, 0},
+ ExemplarsPerBucket: []*metricdata.Exemplar{e2, e3, nil},
+ Max: 1.2,
+ Min: 0.5,
+ Mean: 0.7999999999999999,
SumOfSquaredDev: 0,
}
if diff := cmpDD(dd, want); diff != "" {
diff --git a/stats/view/collector.go b/stats/view/collector.go
index 250395d..8a6a2c0 100644
--- a/stats/view/collector.go
+++ b/stats/view/collector.go
@@ -17,6 +17,7 @@
import (
"sort"
+ "time"
"go.opencensus.io/internal/tagencoding"
"go.opencensus.io/tag"
@@ -31,13 +32,13 @@
a *Aggregation
}
-func (c *collector) addSample(s string, v float64) {
+func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
c.signatures[s] = aggregator
}
- aggregator.addSample(v)
+ aggregator.addSample(v, attachments, t)
}
// collectRows returns a snapshot of the collected Row values.
diff --git a/stats/view/view.go b/stats/view/view.go
index 633d346..95f01ad 100644
--- a/stats/view/view.go
+++ b/stats/view/view.go
@@ -150,12 +150,12 @@
return v.collector.collectedRows(v.view.TagKeys)
}
-func (v *viewInternal) addSample(m *tag.Map, val float64) {
+func (v *viewInternal) addSample(m *tag.Map, val float64, attachments map[string]interface{}, t time.Time) {
if !v.isSubscribed() {
return
}
sig := string(encodeWithKeys(m, v.view.TagKeys))
- v.collector.addSample(sig, val)
+ v.collector.addSample(sig, val, attachments, t)
}
// A Data is a set of rows about usage of the single measure associated
diff --git a/stats/view/view_test.go b/stats/view/view_test.go
index aef2352..7d2bed9 100644
--- a/stats/view/view_test.go
+++ b/stats/view/view_test.go
@@ -18,6 +18,7 @@
import (
"context"
"testing"
+ "time"
"github.com/google/go-cmp/cmp"
@@ -177,7 +178,7 @@
if err != nil {
t.Errorf("%v: New = %v", tc.label, err)
}
- view.addSample(tag.FromContext(ctx), r.f)
+ view.addSample(tag.FromContext(ctx), r.f, nil, time.Now())
}
gotRows := view.collectedRows()
@@ -293,7 +294,7 @@
if err != nil {
t.Errorf("%v: New = %v", tt.label, err)
}
- view.addSample(tag.FromContext(ctx), r.f)
+ view.addSample(tag.FromContext(ctx), r.f, nil, time.Now())
}
gotRows := view.collectedRows()
diff --git a/stats/view/worker.go b/stats/view/worker.go
index 0069e4b..d29dbae 100644
--- a/stats/view/worker.go
+++ b/stats/view/worker.go
@@ -102,7 +102,7 @@
return resp.rows, resp.err
}
-func record(tags *tag.Map, ms interface{}, attachments map[string]string) {
+func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
req := &recordReq{
tm: tags,
ms: ms.([]stats.Measurement),
diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go
index 33f2316..e27f294 100644
--- a/stats/view/worker_commands.go
+++ b/stats/view/worker_commands.go
@@ -148,7 +148,7 @@
type recordReq struct {
tm *tag.Map
ms []stats.Measurement
- attachments map[string]string
+ attachments map[string]interface{}
t time.Time
}
@@ -159,7 +159,7 @@
}
ref := w.getMeasureRef(m.Measure().Name())
for v := range ref.views {
- v.addSample(cmd.tm, m.Value())
+ v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now())
}
}
}