blob: 01cd0b9b157d99ad8de54cd9a92397a9ec10bb15 [file] [log] [blame]
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package opencensus
import (
"context"
"strings"
"sync/atomic"
"time"
ocstats "go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
var logger = grpclog.Component("opencensus-instrumentation")
var canonicalString = internal.CanonicalString.(func(codes.Code) string)
var (
// bounds separate variable for testing purposes.
bytesDistributionBounds = []float64{1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296}
bytesDistribution = view.Distribution(bytesDistributionBounds...)
millisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
countDistributionBounds = []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}
countDistribution = view.Distribution(countDistributionBounds...)
)
func removeLeadingSlash(mn string) string {
return strings.TrimLeft(mn, "/")
}
// metricsInfo is data used for recording metrics about the rpc attempt client
// side, and the overall rpc server side.
type metricsInfo struct {
// access these counts atomically for hedging in the future
// number of messages sent from side (client || server)
sentMsgs int64
// number of bytes sent (within each message) from side (client || server)
sentBytes int64
// number of bytes after compression (within each message) from side (client || server)
sentCompressedBytes int64
// number of messages received on side (client || server)
recvMsgs int64
// number of bytes received (within each message) received on side (client
// || server)
recvBytes int64
// number of compressed bytes received (within each message) received on
// side (client || server)
recvCompressedBytes int64
startTime time.Time
method string
}
// statsTagRPC creates a recording object to derive measurements from in the
// context, scoping the recordings to per RPC Attempt client side (scope of the
// context). It also populates the gRPC Metadata within the context with any
// opencensus specific tags set by the application in the context, binary
// encoded to send across the wire.
func (csh *clientStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) (context.Context, *metricsInfo) {
mi := &metricsInfo{
startTime: time.Now(),
method: info.FullMethodName,
}
// Populate gRPC Metadata with OpenCensus tag map if set by application.
if tm := tag.FromContext(ctx); tm != nil {
ctx = stats.SetTags(ctx, tag.Encode(tm))
}
return ctx, mi
}
// statsTagRPC creates a recording object to derive measurements from in the
// context, scoping the recordings to per RPC server side (scope of the
// context). It also deserializes the opencensus tags set in the context's gRPC
// Metadata, and adds a server method tag to the opencensus tags.
func (ssh *serverStatsHandler) statsTagRPC(ctx context.Context, info *stats.RPCTagInfo) (context.Context, *metricsInfo) {
mi := &metricsInfo{
startTime: time.Now(),
method: info.FullMethodName,
}
if tagsBin := stats.Tags(ctx); tagsBin != nil {
if tags, err := tag.Decode(tagsBin); err == nil {
ctx = tag.NewContext(ctx, tags)
}
}
// We can ignore the error here because in the error case, the context
// passed in is returned. If the call errors, the server side application
// layer won't get this key server method information in the tag map, but
// this instrumentation code will function as normal.
ctx, _ = tag.New(ctx, tag.Upsert(keyServerMethod, removeLeadingSlash(info.FullMethodName)))
return ctx, mi
}
func recordRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
if mi == nil {
// Shouldn't happen, as gRPC calls TagRPC which populates the metricsInfo in
// context.
logger.Error("ctx passed into stats handler metrics event handling has no metrics data present")
return
}
switch st := s.(type) {
case *stats.InHeader, *stats.OutHeader, *stats.InTrailer, *stats.OutTrailer:
// Headers and Trailers are not relevant to the measures, as the
// measures concern number of messages and bytes for messages. This
// aligns with flow control.
case *stats.Begin:
recordDataBegin(ctx, mi, st)
case *stats.OutPayload:
recordDataOutPayload(mi, st)
case *stats.InPayload:
recordDataInPayload(mi, st)
case *stats.End:
recordDataEnd(ctx, mi, st)
default:
// Shouldn't happen. gRPC calls into stats handler, and will never not
// be one of the types above.
logger.Errorf("Received unexpected stats type (%T) with data: %v", s, s)
}
}
// recordDataBegin takes a measurement related to the RPC beginning,
// client/server started RPCs dependent on the caller.
func recordDataBegin(ctx context.Context, mi *metricsInfo, b *stats.Begin) {
if b.Client {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(keyClientMethod, removeLeadingSlash(mi.method))),
ocstats.WithMeasurements(clientStartedRPCs.M(1)))
return
}
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(keyServerMethod, removeLeadingSlash(mi.method))),
ocstats.WithMeasurements(serverStartedRPCs.M(1)))
}
// recordDataOutPayload records the length in bytes of outgoing messages and
// increases total count of sent messages both stored in the RPCs (attempt on
// client side) context for use in taking measurements at RPC end.
func recordDataOutPayload(mi *metricsInfo, op *stats.OutPayload) {
atomic.AddInt64(&mi.sentMsgs, 1)
atomic.AddInt64(&mi.sentBytes, int64(op.Length))
atomic.AddInt64(&mi.sentCompressedBytes, int64(op.CompressedLength))
}
// recordDataInPayload records the length in bytes of incoming messages and
// increases total count of sent messages both stored in the RPCs (attempt on
// client side) context for use in taking measurements at RPC end.
func recordDataInPayload(mi *metricsInfo, ip *stats.InPayload) {
atomic.AddInt64(&mi.recvMsgs, 1)
atomic.AddInt64(&mi.recvBytes, int64(ip.Length))
atomic.AddInt64(&mi.recvCompressedBytes, int64(ip.CompressedLength))
}
// recordDataEnd takes per RPC measurements derived from information derived
// from the lifetime of the RPC (RPC attempt client side).
func recordDataEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
// latency bounds for distribution data (speced millisecond bounds) have
// fractions, thus need a float.
latency := float64(time.Since(mi.startTime)) / float64(time.Millisecond)
var st string
if e.Error != nil {
s, _ := status.FromError(e.Error)
st = canonicalString(s.Code())
} else {
st = "OK"
}
// TODO: Attach trace data through attachments?!?!
if e.Client {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(
tag.Upsert(keyClientMethod, removeLeadingSlash(mi.method)),
tag.Upsert(keyClientStatus, st)),
ocstats.WithMeasurements(
clientSentBytesPerRPC.M(atomic.LoadInt64(&mi.sentBytes)),
clientSentCompressedBytesPerRPC.M(atomic.LoadInt64(&mi.sentCompressedBytes)),
clientSentMessagesPerRPC.M(atomic.LoadInt64(&mi.sentMsgs)),
clientReceivedMessagesPerRPC.M(atomic.LoadInt64(&mi.recvMsgs)),
clientReceivedBytesPerRPC.M(atomic.LoadInt64(&mi.recvBytes)),
clientReceivedCompressedBytesPerRPC.M(atomic.LoadInt64(&mi.recvCompressedBytes)),
clientRoundtripLatency.M(latency),
clientServerLatency.M(latency),
))
return
}
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(
tag.Upsert(keyServerMethod, removeLeadingSlash(mi.method)),
tag.Upsert(keyServerStatus, st),
),
ocstats.WithMeasurements(
serverSentBytesPerRPC.M(atomic.LoadInt64(&mi.sentBytes)),
serverSentCompressedBytesPerRPC.M(atomic.LoadInt64(&mi.sentCompressedBytes)),
serverSentMessagesPerRPC.M(atomic.LoadInt64(&mi.sentMsgs)),
serverReceivedMessagesPerRPC.M(atomic.LoadInt64(&mi.recvMsgs)),
serverReceivedBytesPerRPC.M(atomic.LoadInt64(&mi.recvBytes)),
serverReceivedCompressedBytesPerRPC.M(atomic.LoadInt64(&mi.recvCompressedBytes)),
serverLatency.M(latency)))
}