blob: 721742de5c932e9c52ed2a75d82ed626440e8464 [file] [log] [blame]
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// START entire
// This example demonstrates the use of derived gauges. It is a simple interactive program of consumer
// and producer. User can input number of items to produce. Producer produces specified number of
// items. Consumer randomly consumes 1-5 items in each attempt. It then sleeps randomly
// between 1-10 seconds before the next attempt. Two metrics collected to monitor the queue.
//
// Metrics
//
// * queue_size: It is an instantaneous queue size represented using derived gauge int64.
//
// * queue_seconds_since_processed_last: It is the time elaspsed in seconds since the last time
// when the queue was consumed. It is represented using derived gauge float64.
package main
import (
"bufio"
"fmt"
"log"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
"go.opencensus.io/examples/exporter"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
)
const (
metricsLogFile = "/tmp/metrics.log"
)
type queue struct {
size int
lastConsumed time.Time
mu sync.Mutex
q []int
}
var q = &queue{}
const (
maxItemsToConsumePerAttempt = 25
)
func init() {
q.q = make([]int, 100)
}
// consume randomly dequeues upto 5 items from the queue
func (q *queue) consume() {
q.mu.Lock()
defer q.mu.Unlock()
consumeCount := rand.Int() % maxItemsToConsumePerAttempt
i := 0
for i = 0; i < consumeCount; i++ {
if q.size > 0 {
q.q = q.q[1:]
q.size--
} else {
break
}
}
if i > 0 {
q.lastConsumed = time.Now()
}
}
// produce randomly enqueues upto 5 items from the queue
func (q *queue) produce(count int) {
q.mu.Lock()
defer q.mu.Unlock()
for i := 0; i < count; i++ {
v := rand.Int() % 100
q.q = append(q.q, v)
q.size++
}
fmt.Printf("queued %d items, queue size is %d\n", count, q.size)
}
func (q *queue) runConsumer(interval time.Duration, cQuit chan bool) {
t := time.NewTicker(interval)
for {
select {
case <-t.C:
q.consume()
case <-cQuit:
t.Stop()
return
}
}
}
// Size reports instantaneous queue size.
// This is the interface supplied while creating an entry for derived gauge int64.
// START toint64
func (q *queue) Size() int64 {
q.mu.Lock()
defer q.mu.Unlock()
return int64(q.size)
}
// END toint64
// Elapsed reports time elapsed since the last time an item was consumed from the queue.
// This is the interface supplied while creating an entry for derived gauge float64.
// START tofloat64
func (q *queue) Elapsed() float64 {
q.mu.Lock()
defer q.mu.Unlock()
return time.Now().Sub(q.lastConsumed).Seconds()
}
// END tofloat64
func getInput() int {
reader := bufio.NewReader(os.Stdin)
limit := 100
for {
fmt.Printf("Enter number of items to put in consumer queue? [1-%d]: ", limit)
text, _ := reader.ReadString('\n')
count, err := strconv.Atoi(strings.TrimSuffix(text, "\n"))
if err == nil {
if count < 1 || count > limit {
fmt.Printf("invalid value %s\n", text)
continue
}
return count
}
fmt.Printf("error %v\n", err)
}
}
func doWork() {
fmt.Printf("Program monitors queue using two derived gauge metrics.\n")
fmt.Printf(" 1. queue_size = the instantaneous size of the queue.\n")
fmt.Printf(" 2. queue_seconds_since_processed_last = the number of seconds elapsed since last time the queue was processed.\n")
fmt.Printf("\nGo to file://%s to see the metrics. OR do `tail -f %s` in another terminal\n\n\n",
metricsLogFile, metricsLogFile)
// Take a number of items to queue as an input from the user
// and enqueue the same number of items on to the consumer queue.
for {
count := getInput()
q.produce(count)
fmt.Printf("press CTRL+C to terminate the program\n")
}
}
func main() {
// Using logexporter but you can choose any supported exporter.
exporter, err := exporter.NewLogExporter(exporter.Options{
ReportingInterval: time.Duration(10 * time.Second),
MetricsLogFile: metricsLogFile,
})
if err != nil {
log.Fatalf("Error creating log exporter: %v", err)
}
exporter.Start()
defer exporter.Stop()
defer exporter.Close()
// Create metric registry and register it with global producer manager.
// START reg
r := metric.NewRegistry()
metricproducer.GlobalManager().AddProducer(r)
// END reg
// Create Int64DerviedGauge
// START size
queueSizeGauge, err := r.AddInt64DerivedGauge(
"queue_size",
metric.WithDescription("Instantaneous queue size"),
metric.WithUnit(metricdata.UnitDimensionless))
if err != nil {
log.Fatalf("error creating queue size derived gauge, error %v\n", err)
}
// END size
// START entrySize
err = queueSizeGauge.UpsertEntry(q.Size)
if err != nil {
log.Fatalf("error getting queue size derived gauge entry, error %v\n", err)
}
// END entrySize
// Create Float64DerviedGauge
// START elapsed
elapsedSeconds, err := r.AddFloat64DerivedGauge(
"queue_seconds_since_processed_last",
metric.WithDescription("time elapsed since last time the queue was processed"),
metric.WithUnit(metricdata.UnitDimensionless))
if err != nil {
log.Fatalf("error creating queue_seconds_since_processed_last derived gauge, error %v\n", err)
}
// END elapsed
// START entryElapsed
err = elapsedSeconds.UpsertEntry(q.Elapsed)
if err != nil {
log.Fatalf("error getting queue_seconds_since_processed_last derived gauge entry, error %v\n", err)
}
// END entryElapsed
quit := make(chan bool)
defer func() {
close(quit)
}()
// Run consumer and producer
go q.runConsumer(5*time.Second, quit)
for {
doWork()
}
}
// END entire