| // Copyright 2019, OpenCensus Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| package metricexport |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "time" |
| |
| "go.opencensus.io/metric/metricdata" |
| "go.opencensus.io/metric/metricproducer" |
| "go.opencensus.io/trace" |
| ) |
| |
| var ( |
| defaultSampler = trace.ProbabilitySampler(0.0001) |
| errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration) |
| errAlreadyStarted = fmt.Errorf("already started") |
| errIntervalReaderNil = fmt.Errorf("interval reader is nil") |
| errExporterNil = fmt.Errorf("exporter is nil") |
| errReaderNil = fmt.Errorf("reader is nil") |
| ) |
| |
| const ( |
| defaultReportingDuration = 60 * time.Second |
| minimumReportingDuration = 1 * time.Second |
| defaultSpanName = "ExportMetrics" |
| ) |
| |
| // ReaderOptions contains options pertaining to metrics reader. |
| type ReaderOptions struct { |
| // SpanName is the name used for span created to export metrics. |
| SpanName string |
| } |
| |
| // Reader reads metrics from all producers registered |
| // with producer manager and exports those metrics using provided |
| // exporter. |
| type Reader struct { |
| sampler trace.Sampler |
| |
| spanName string |
| } |
| |
| // IntervalReader periodically reads metrics from all producers registered |
| // with producer manager and exports those metrics using provided |
| // exporter. Call Reader.Stop() to stop the reader. |
| type IntervalReader struct { |
| // ReportingInterval it the time duration between two consecutive |
| // metrics reporting. defaultReportingDuration is used if it is not set. |
| // It cannot be set lower than minimumReportingDuration. |
| ReportingInterval time.Duration |
| |
| exporter Exporter |
| timer *time.Ticker |
| quit, done chan bool |
| mu sync.RWMutex |
| reader *Reader |
| } |
| |
| // ReaderOption apply changes to ReaderOptions. |
| type ReaderOption func(*ReaderOptions) |
| |
| // WithSpanName makes new reader to use given span name when exporting metrics. |
| func WithSpanName(spanName string) ReaderOption { |
| return func(o *ReaderOptions) { |
| o.SpanName = spanName |
| } |
| } |
| |
| // NewReader returns a reader configured with specified options. |
| func NewReader(o ...ReaderOption) *Reader { |
| var opts ReaderOptions |
| for _, op := range o { |
| op(&opts) |
| } |
| reader := &Reader{defaultSampler, defaultSpanName} |
| if opts.SpanName != "" { |
| reader.spanName = opts.SpanName |
| } |
| return reader |
| } |
| |
| // NewIntervalReader creates a reader. Once started it periodically |
| // reads metrics from all producers and exports them using provided exporter. |
| func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) { |
| if exporter == nil { |
| return nil, errExporterNil |
| } |
| if reader == nil { |
| return nil, errReaderNil |
| } |
| |
| r := &IntervalReader{ |
| exporter: exporter, |
| reader: reader, |
| } |
| return r, nil |
| } |
| |
| // Start starts the IntervalReader which periodically reads metrics from all |
| // producers registered with global producer manager. If the reporting interval |
| // is not set prior to calling this function then default reporting interval |
| // is used. |
| func (ir *IntervalReader) Start() error { |
| if ir == nil { |
| return errIntervalReaderNil |
| } |
| ir.mu.Lock() |
| defer ir.mu.Unlock() |
| var reportingInterval = defaultReportingDuration |
| if ir.ReportingInterval != 0 { |
| if ir.ReportingInterval < minimumReportingDuration { |
| return errReportingIntervalTooLow |
| } |
| reportingInterval = ir.ReportingInterval |
| } |
| |
| if ir.quit != nil { |
| return errAlreadyStarted |
| } |
| ir.timer = time.NewTicker(reportingInterval) |
| ir.quit = make(chan bool) |
| ir.done = make(chan bool) |
| |
| go ir.startInternal() |
| return nil |
| } |
| |
| func (ir *IntervalReader) startInternal() { |
| for { |
| select { |
| case <-ir.timer.C: |
| ir.reader.ReadAndExport(ir.exporter) |
| case <-ir.quit: |
| ir.timer.Stop() |
| ir.done <- true |
| return |
| } |
| } |
| } |
| |
| // Stop stops the reader from reading and exporting metrics. |
| // Additional call to Stop are no-ops. |
| func (ir *IntervalReader) Stop() { |
| if ir == nil { |
| return |
| } |
| ir.mu.Lock() |
| defer ir.mu.Unlock() |
| if ir.quit == nil { |
| return |
| } |
| ir.quit <- true |
| <-ir.done |
| close(ir.quit) |
| close(ir.done) |
| ir.quit = nil |
| } |
| |
| // Flush flushes the metrics if IntervalReader is stopped, otherwise no-op. |
| func (ir *IntervalReader) Flush() { |
| ir.mu.Lock() |
| defer ir.mu.Unlock() |
| |
| // No-op if IntervalReader is not stopped |
| if ir.quit != nil { |
| return |
| } |
| |
| ir.reader.ReadAndExport(ir.exporter) |
| } |
| |
| // ReadAndExport reads metrics from all producer registered with |
| // producer manager and then exports them using provided exporter. |
| func (r *Reader) ReadAndExport(exporter Exporter) { |
| ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler)) |
| defer span.End() |
| producers := metricproducer.GlobalManager().GetAll() |
| data := []*metricdata.Metric{} |
| for _, producer := range producers { |
| data = append(data, producer.Read()...) |
| } |
| // TODO: [rghetia] add metrics for errors. |
| exporter.ExportMetrics(ctx, data) |
| } |