blob: 8a09d0f007697016d6c89fbf9b3a0f68ab0283f8 [file] [log] [blame]
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package metricexport
import (
"context"
"fmt"
"sync"
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/trace"
)
var (
defaultSampler = trace.ProbabilitySampler(0.0001)
errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration)
errAlreadyStarted = fmt.Errorf("already started")
errIntervalReaderNil = fmt.Errorf("interval reader is nil")
errExporterNil = fmt.Errorf("exporter is nil")
errReaderNil = fmt.Errorf("reader is nil")
)
const (
defaultReportingDuration = 60 * time.Second
minimumReportingDuration = 1 * time.Second
defaultSpanName = "ExportMetrics"
)
// ReaderOptions contains options pertaining to metrics reader.
type ReaderOptions struct {
// SpanName is the name used for span created to export metrics.
SpanName string
}
// Reader reads metrics from all producers registered
// with producer manager and exports those metrics using provided
// exporter.
type Reader struct {
sampler trace.Sampler
spanName string
}
// IntervalReader periodically reads metrics from all producers registered
// with producer manager and exports those metrics using provided
// exporter. Call Reader.Stop() to stop the reader.
type IntervalReader struct {
// ReportingInterval it the time duration between two consecutive
// metrics reporting. defaultReportingDuration is used if it is not set.
// It cannot be set lower than minimumReportingDuration.
ReportingInterval time.Duration
exporter Exporter
timer *time.Ticker
quit, done chan bool
mu sync.RWMutex
reader *Reader
}
// ReaderOption apply changes to ReaderOptions.
type ReaderOption func(*ReaderOptions)
// WithSpanName makes new reader to use given span name when exporting metrics.
func WithSpanName(spanName string) ReaderOption {
return func(o *ReaderOptions) {
o.SpanName = spanName
}
}
// NewReader returns a reader configured with specified options.
func NewReader(o ...ReaderOption) *Reader {
var opts ReaderOptions
for _, op := range o {
op(&opts)
}
reader := &Reader{defaultSampler, defaultSpanName}
if opts.SpanName != "" {
reader.spanName = opts.SpanName
}
return reader
}
// NewIntervalReader creates a reader. Once started it periodically
// reads metrics from all producers and exports them using provided exporter.
func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) {
if exporter == nil {
return nil, errExporterNil
}
if reader == nil {
return nil, errReaderNil
}
r := &IntervalReader{
exporter: exporter,
reader: reader,
}
return r, nil
}
// Start starts the IntervalReader which periodically reads metrics from all
// producers registered with global producer manager. If the reporting interval
// is not set prior to calling this function then default reporting interval
// is used.
func (ir *IntervalReader) Start() error {
if ir == nil {
return errIntervalReaderNil
}
ir.mu.Lock()
defer ir.mu.Unlock()
var reportingInterval = defaultReportingDuration
if ir.ReportingInterval != 0 {
if ir.ReportingInterval < minimumReportingDuration {
return errReportingIntervalTooLow
}
reportingInterval = ir.ReportingInterval
}
if ir.quit != nil {
return errAlreadyStarted
}
ir.timer = time.NewTicker(reportingInterval)
ir.quit = make(chan bool)
ir.done = make(chan bool)
go ir.startInternal()
return nil
}
func (ir *IntervalReader) startInternal() {
for {
select {
case <-ir.timer.C:
ir.reader.ReadAndExport(ir.exporter)
case <-ir.quit:
ir.timer.Stop()
ir.done <- true
return
}
}
}
// Stop stops the reader from reading and exporting metrics.
// Additional call to Stop are no-ops.
func (ir *IntervalReader) Stop() {
if ir == nil {
return
}
ir.mu.Lock()
defer ir.mu.Unlock()
if ir.quit == nil {
return
}
ir.quit <- true
<-ir.done
close(ir.quit)
close(ir.done)
ir.quit = nil
}
// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op.
func (ir *IntervalReader) Flush() {
ir.mu.Lock()
defer ir.mu.Unlock()
// No-op if IntervalReader is not stopped
if ir.quit != nil {
return
}
ir.reader.ReadAndExport(ir.exporter)
}
// ReadAndExport reads metrics from all producer registered with
// producer manager and then exports them using provided exporter.
func (r *Reader) ReadAndExport(exporter Exporter) {
ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler))
defer span.End()
producers := metricproducer.GlobalManager().GetAll()
data := []*metricdata.Metric{}
for _, producer := range producers {
data = append(data, producer.Read()...)
}
// TODO: [rghetia] add metrics for errors.
exporter.ExportMetrics(ctx, data)
}