blob: a17b4f5d3b04e47b9e75dfdfe7de726f3f38d39b [file] [edit]
/*
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"context"
"errors"
"fmt"
"hash/fnv"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/alts"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats/opentelemetry"
"google.golang.org/grpc/status"
"cloud.google.com/go/spanner/internal"
)
const (
builtInMetricsMeterName = "gax-go"
grpcMetricMeterName = "grpc-go"
grpcGcpMetricMeterName = "grpc-gcp-go"
nativeMetricsPrefix = "spanner.googleapis.com/internal/client/"
// Monitored resource labels
monitoredResLabelKeyProject = "project_id"
monitoredResLabelKeyInstance = "instance_id"
monitoredResLabelKeyInstanceConfig = "instance_config"
monitoredResLabelKeyLocation = "location"
monitoredResLabelKeyClientHash = "client_hash"
// Metric labels
metricLabelKeyClientUID = "client_uid"
metricLabelKeyClientName = "client_name"
metricLabelKeyDatabase = "database"
metricLabelKeyMethod = "method"
metricLabelKeyStatus = "status"
metricLabelKeyDirectPathEnabled = "directpath_enabled"
metricLabelKeyDirectPathUsed = "directpath_used"
metricLabelKeyGRPCLBPickResult = "grpc.lb.pick_result"
metricLabelKeyGRPCLBDataPlaneTarget = "grpc.lb.rls.data_plane_target"
metricLabelKeyGRPCXDSResourceType = "grpc.xds.resource_type"
metricLabelKeyGRPCLBLocality = "grpc.lb.locality"
metricLabelKeyGRPCLBBackendService = "grpc.lb.backend_service"
metricLabelKeyGRPCDisconnectError = "grpc.disconnect_error"
metricLabelKeyFromChannelName = "from_channel_name"
metricLabelKeyToChannelName = "to_channel_name"
metricLabelKeyChannelName = "channel_name"
metricLabelKeyStatusCode = "status_code"
// Metric names
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameOperationCount = "operation_count"
metricNameAttemptCount = "attempt_count"
metricNameAFELatencies = "afe_latencies"
metricNameGFELatencies = "gfe_latencies"
metricNameGFEConnectivityErrorCount = "gfe_connectivity_error_count"
metricNameAFEConnectivityErrorCount = "afe_connectivity_error_count"
metricNameEEFFallbackCount = "eef.fallback_count"
metricNameEEFCallStatus = "eef.call_status"
// Metric units
metricUnitMS = "ms"
metricUnitCount = "1"
defaultClientLocation = "global"
)
// These are effectively const, but for testing purposes they are mutable
var (
// duration between two metric exports
defaultSamplePeriod = 1 * time.Minute
clientName = fmt.Sprintf("spanner-go/%v", internal.Version)
bucketBounds = []float64{0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0,
25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0,
800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0,
400000.0, 800000.0, 1600000.0, 3200000.0}
// All the built-in metrics have same attributes except 'status' and 'streaming'
// These attributes need to be added to only few of the metrics
metricsDetails = map[string]metricInfo{
metricNameOperationCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: false,
},
metricNameOperationLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: false,
},
metricNameAttemptLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameAttemptCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameAFELatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameGFELatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameGFEConnectivityErrorCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameAFEConnectivityErrorCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
}
// Generates unique client ID in the format go-<random UUID>@<hostname>
generateClientUID = func() (string, error) {
hostname := "localhost"
hostname, err := os.Hostname()
if err != nil {
return "", err
}
return uuid.NewString() + "@" + strconv.FormatInt(int64(os.Getpid()), 10) + "@" + hostname, nil
}
// generateClientHash generates a 6-digit zero-padded lowercase hexadecimal hash
// using the 10 most significant bits of a 64-bit hash value.
//
// The primary purpose of this function is to generate a hash value for the `client_hash`
// resource label using `client_uid` metric field. The range of values is chosen to be small
// enough to keep the cardinality of the Resource targets under control. Note: If at later time
// the range needs to be increased, it can be done by increasing the value of `kPrefixLength` to
// up to 24 bits without changing the format of the returned value.
generateClientHash = func(clientUID string) string {
if clientUID == "" {
return "000000"
}
// Use FNV hash function to generate a 64-bit hash
hasher := fnv.New64()
hasher.Write([]byte(clientUID))
hashValue := hasher.Sum64()
// Extract the 10 most significant bits
// Shift right by 54 bits to get the 10 most significant bits
kPrefixLength := 10
tenMostSignificantBits := hashValue >> (64 - kPrefixLength)
// Format the result as a 6-digit zero-padded hexadecimal string
return fmt.Sprintf("%06x", tenMostSignificantBits)
}
detectClientLocation = func(ctx context.Context) string {
if emulatorAddr, found := os.LookupEnv("SPANNER_EMULATOR_HOST"); found && emulatorAddr != "" {
return defaultClientLocation
}
resource, err := gcp.NewDetector().Detect(ctx)
if err != nil {
return defaultClientLocation
}
for _, attr := range resource.Attributes() {
if attr.Key == semconv.CloudRegionKey {
return attr.Value.AsString()
}
}
// If region is not found, return global
return defaultClientLocation
}
// GCM exporter should use the same options as Spanner client
// createExporterOptions takes Spanner client options and returns exporter options
// Overwritten in tests
createExporterOptions = func(spannerOpts ...option.ClientOption) []option.ClientOption {
defaultMonitoringEndpoint := "monitoring.googleapis.com:443"
if os.Getenv("SPANNER_MONITORING_HOST") != "" {
defaultMonitoringEndpoint = os.Getenv("SPANNER_MONITORING_HOST")
}
// overwrite any Endpoint option
spannerOpts = append(spannerOpts, option.WithEndpoint(defaultMonitoringEndpoint))
return spannerOpts
}
grpcMetricsToEnable = []string{
"grpc.client.attempt.started",
"grpc.subchannel.open_connections",
"grpc.subchannel.disconnections",
"grpc.subchannel.connection_attempts_succeeded",
"grpc.subchannel.connection_attempts_failed",
"grpc.lb.rls.default_target_picks",
"grpc.lb.rls.target_picks",
"grpc.xds_client.server_failure",
"grpc.xds_client.resource_updates_invalid",
"grpc.xds_client.resource_updates_valid",
}
grpcOptionalLabels = []string{
"grpc.disconnect_error",
"grpc.lb.backend_service",
"grpc.lb.locality",
}
)
type metricInfo struct {
additionalAttrs []string
recordedPerAttempt bool
}
// builtinMetricsTracerFactory is responsible for creating and managing metrics tracers.
type builtinMetricsTracerFactory struct {
enabled bool // Indicates if metrics tracing is enabled.
isDirectPathEnabled bool // Indicates if DirectPath is enabled.
isAFEBuiltInMetricEnabled bool
// shutdown is a function to be called on client close to clean up resources.
shutdown func(ctx context.Context)
// client options passed to gRPC channels
clientOpts []option.ClientOption
// clientAttributes are attributes specific to a client instance that do not change across different function calls on the client.
clientAttributes []attribute.KeyValue
// Metrics instruments
operationLatencies metric.Float64Histogram // Histogram for operation latencies.
attemptLatencies metric.Float64Histogram // Histogram for attempt latencies.
gfeLatencies metric.Float64Histogram // Latency between Google's network receiving an RPC and reading back the first byte of the response
afeLatencies metric.Float64Histogram // Latency between Spanner API Frontend receiving an RPC and starting to write back the response.
gfeErrorCount metric.Int64Counter // Counter for the number of requests that failed to reach the Google network.
afeErrorCount metric.Int64Counter // Counter for the number of requests that failed to reach the Spanner API Frontend.
operationCount metric.Int64Counter // Counter for the number of operations.
attemptCount metric.Int64Counter // Counter for the number of attempts.
meterProvider metric.MeterProvider
}
func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath, compression string, isAFEBuiltInMetricEnabled, isEnableGRPCBuiltInMetrics bool, metricsProvider metric.MeterProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
clientUID, err := generateClientUID()
if err != nil {
log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID)
}
project, instance, database, err := parseDatabaseName(dbpath)
if err != nil {
return nil, err
}
tracerFactory := &builtinMetricsTracerFactory{
enabled: false,
clientAttributes: []attribute.KeyValue{
attribute.String(monitoredResLabelKeyProject, project),
attribute.String(monitoredResLabelKeyInstance, instance),
attribute.String(metricLabelKeyDatabase, database),
attribute.String(metricLabelKeyClientUID, clientUID),
attribute.String(metricLabelKeyClientName, clientName),
attribute.String(monitoredResLabelKeyClientHash, generateClientHash(clientUID)),
// Skipping instance config until we have a way to get it
attribute.String(monitoredResLabelKeyInstanceConfig, "unknown"),
attribute.String(monitoredResLabelKeyLocation, detectClientLocation(ctx)),
},
shutdown: func(ctx context.Context) {},
}
tracerFactory.isAFEBuiltInMetricEnabled = isAFEBuiltInMetricEnabled
tracerFactory.isDirectPathEnabled = false
tracerFactory.enabled = false
var meterProvider *sdkmetric.MeterProvider
if metricsProvider == nil {
// Create default meter provider
mpOptions, exporter, err := builtInMeterProviderOptions(project, compression, tracerFactory.clientAttributes, opts...)
if err != nil {
return tracerFactory, err
}
meterProvider = sdkmetric.NewMeterProvider(mpOptions...)
tracerFactory.meterProvider = meterProvider
if isEnableGRPCBuiltInMetrics {
mo := opentelemetry.MetricsOptions{
MeterProvider: meterProvider,
Metrics: stats.NewMetrics(grpcMetricsToEnable...),
OptionalLabels: grpcOptionalLabels,
}
// Configure gRPC dial options to enable gRPC metrics collection and static method call option.
// The static method call option ensures consistent method names in metrics by preventing gRPC from
// automatically adding service prefixes to method names. This helps maintain consistent metric
// naming across different gRPC calls.
tracerFactory.clientOpts = []option.ClientOption{
option.WithGRPCDialOption(
opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})),
option.WithGRPCDialOption(
grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
}
}
tracerFactory.enabled = true
tracerFactory.shutdown = func(ctx context.Context) {
exporter.stop()
meterProvider.Shutdown(ctx)
}
} else {
switch metricsProvider.(type) {
case noop.MeterProvider:
return tracerFactory, nil
default:
return tracerFactory, errors.New("unknown MetricsProvider type")
}
}
// Create meter and instruments
meter := meterProvider.Meter(builtInMetricsMeterName, metric.WithInstrumentationVersion(internal.Version))
err = tracerFactory.createInstruments(meter)
return tracerFactory, err
}
func builtInMeterProviderOptions(project, compression string, clientAttributes []attribute.KeyValue, opts ...option.ClientOption) ([]sdkmetric.Option, *monitoringExporter, error) {
allOpts := createExporterOptions(opts...)
defaultExporter, err := newMonitoringExporter(context.Background(), project, compression, clientAttributes, allOpts...)
if err != nil {
return nil, nil, err
}
var views []sdkmetric.View
for _, m := range grpcMetricsToEnable {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{
Name: m,
},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationSum{},
AttributeFilter: func(kv attribute.KeyValue) bool {
if _, ok := allowedMetricLabels[string(kv.Key)]; ok {
return true
}
return false
},
},
))
}
skippedEEFMetrics := []string{
"eef.probe_result",
"eef.error_ratio",
"eef.current_channel",
"eef.channel_downtime",
}
for _, m := range skippedEEFMetrics {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}},
))
}
eefMetricsToEnable := []string{
metricNameEEFFallbackCount,
metricNameEEFCallStatus,
}
for _, m := range eefMetricsToEnable {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationSum{},
AttributeFilter: func(kv attribute.KeyValue) bool {
if _, ok := allowedEEFMetricLabels[string(kv.Key)]; ok {
return true
}
return false
},
},
))
}
return []sdkmetric.Option{sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
defaultExporter,
sdkmetric.WithInterval(defaultSamplePeriod),
),
), sdkmetric.WithView(views...)}, defaultExporter, nil
}
func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) error {
var err error
// Create operation_latencies
tf.operationLatencies, err = meter.Float64Histogram(
nativeMetricsPrefix+metricNameOperationLatencies,
metric.WithDescription("Total time until final operation success or failure, including retries and backoff."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}
// Create attempt_latencies
tf.attemptLatencies, err = meter.Float64Histogram(
nativeMetricsPrefix+metricNameAttemptLatencies,
metric.WithDescription("Client observed latency per RPC attempt."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}
tf.gfeLatencies, err = meter.Float64Histogram(
nativeMetricsPrefix+metricNameGFELatencies,
metric.WithDescription("Latency between Google's network receiving an RPC and reading back the first byte of the response."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}
tf.afeLatencies, err = meter.Float64Histogram(
nativeMetricsPrefix+metricNameAFELatencies,
metric.WithDescription("Latency between Spanner API Frontend receiving an RPC and starting to write back the response."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}
// Create operation_count
tf.operationCount, err = meter.Int64Counter(
nativeMetricsPrefix+metricNameOperationCount,
metric.WithDescription("The count of database operations."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}
// Create attempt_count
tf.attemptCount, err = meter.Int64Counter(
nativeMetricsPrefix+metricNameAttemptCount,
metric.WithDescription("The number of attempts made for the operation, including the initial attempt."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}
tf.gfeErrorCount, err = meter.Int64Counter(
nativeMetricsPrefix+metricNameGFEConnectivityErrorCount,
metric.WithDescription("Number of requests that failed to reach the Google network."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}
tf.afeErrorCount, err = meter.Int64Counter(
nativeMetricsPrefix+metricNameAFEConnectivityErrorCount,
metric.WithDescription("Number of requests that failed to reach the Spanner API Frontend."),
metric.WithUnit(metricUnitCount),
)
return err
}
// builtinMetricsTracer is created one per operation.
// It is used to store metric instruments, attribute values, and other data required to obtain and record them.
type builtinMetricsTracer struct {
ctx context.Context // Context for the tracer.
builtInEnabled bool // Indicates if built-in metrics are enabled.
isAFEBuiltInMetricEnabled bool
// clientAttributes are attributes specific to a client instance that do not change across different operations on the client.
clientAttributes []attribute.KeyValue
// Metrics instruments
instrumentOperationLatencies metric.Float64Histogram // Histogram for operation latencies.
instrumentAttemptLatencies metric.Float64Histogram // Histogram for attempt latencies.
instrumentGFELatencies metric.Float64Histogram // Histogram for GFE latencies.
instrumentAFELatencies metric.Float64Histogram // Histogram for AFE latencies.
instrumentGFEErrorCount metric.Int64Counter // Counter for GFE connectivity errors.
instrumentAFEErrorCount metric.Int64Counter // Counter for AFE connectivity errors.
instrumentOperationCount metric.Int64Counter // Counter for the number of operations.
instrumentAttemptCount metric.Int64Counter // Counter for the number of attempts.
method string // The method being traced.
currOp *opTracer // The current operation tracer.
}
// opTracer is used to record metrics for the entire operation, including retries.
// An operation is a logical unit that represents a single method invocation on the client.
// The method might require multiple attempts/RPCs and backoff logic to complete.
type opTracer struct {
attemptCount int64 // The number of attempts made for the operation.
startTime time.Time // The start time of the operation.
// status is the gRPC status code of the last completed attempt.
status string
directPathEnabled bool // Indicates if DirectPath is enabled for the operation.
currAttempt *attemptTracer // The current attempt tracer.
}
// attemptTracer is used to record metrics for a single attempt within an operation.
type attemptTracer struct {
startTime time.Time // The start time of the attempt.
status string // The gRPC status code of the attempt.
directPathUsed bool // Indicates if DirectPath was used for the attempt.
serverTimingMetrics map[string]time.Duration
}
// setStartTime sets the start time for the operation.
func (o *opTracer) setStartTime(t time.Time) {
o.startTime = t
}
// setStartTime sets the start time for the attempt.
func (a *attemptTracer) setStartTime(t time.Time) {
a.startTime = t
}
// setStatus sets the status for the operation.
func (o *opTracer) setStatus(s string) {
o.status = s
}
// setStatus sets the status for the attempt.
func (a *attemptTracer) setStatus(s string) {
a.status = s
}
// incrementAttemptCount increments the attempt count for the operation.
func (o *opTracer) incrementAttemptCount() {
o.attemptCount++
}
// setDirectPathUsed sets whether DirectPath was used for the attempt.
func (a *attemptTracer) setDirectPathUsed(ctx context.Context) {
peerInfo, ok := peer.FromContext(ctx)
if ok {
if _, isALTS := peerInfo.AuthInfo.(alts.AuthInfo); isALTS {
a.directPathUsed = true
}
}
}
func (a *attemptTracer) setServerTimingMetrics(metrics map[string]time.Duration) {
a.serverTimingMetrics = metrics
}
// setDirectPathEnabled sets whether DirectPath is enabled for the operation.
func (o *opTracer) setDirectPathEnabled(enabled bool) {
o.directPathEnabled = enabled
}
func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Context) builtinMetricsTracer {
// Operation has started but not the attempt.
// So, create only operation tracer and not attempt tracer
currOpTracer := opTracer{}
currOpTracer.setStartTime(time.Now())
currOpTracer.setDirectPathEnabled(tf.isDirectPathEnabled)
return builtinMetricsTracer{
ctx: ctx,
builtInEnabled: tf.enabled,
currOp: &currOpTracer,
clientAttributes: tf.clientAttributes,
isAFEBuiltInMetricEnabled: tf.isAFEBuiltInMetricEnabled,
instrumentOperationLatencies: tf.operationLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentOperationCount: tf.operationCount,
instrumentAttemptCount: tf.attemptCount,
instrumentGFELatencies: tf.gfeLatencies,
instrumentAFELatencies: tf.afeLatencies,
instrumentGFEErrorCount: tf.gfeErrorCount,
instrumentAFEErrorCount: tf.afeErrorCount,
}
}
// toOtelMetricAttrs:
// - converts metric attributes values captured throughout the operation / attempt
// to OpenTelemetry attributes format,
// - combines these with common client attributes and returns
func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) ([]attribute.KeyValue, error) {
if mt.currOp == nil || mt.currOp.currAttempt == nil {
return nil, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName)
}
// Get metric details
mDetails, found := metricsDetails[metricName]
if !found {
return nil, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName)
}
rpcStatus := mt.currOp.status
if mDetails.recordedPerAttempt {
rpcStatus = mt.currOp.currAttempt.status
}
return []attribute.KeyValue{
attribute.String(metricLabelKeyMethod, strings.ReplaceAll(strings.TrimPrefix(mt.method, "/google.spanner.v1."), "/", ".")),
attribute.String(metricLabelKeyDirectPathEnabled, strconv.FormatBool(mt.currOp.directPathEnabled)),
attribute.String(metricLabelKeyDirectPathUsed, strconv.FormatBool(mt.currOp.currAttempt.directPathUsed)),
attribute.String(metricLabelKeyStatus, rpcStatus),
}, nil
}
func (t *builtinMetricsTracer) recordGFELatency(latency time.Duration) {
if t.builtInEnabled {
attrs, err := t.toOtelMetricAttrs(metricNameGFELatencies)
if err != nil {
return
}
t.instrumentGFELatencies.Record(t.ctx, float64(latency.Milliseconds()), metric.WithAttributes(attrs...))
}
}
func (t *builtinMetricsTracer) recordAFELatency(latency time.Duration) {
if !t.isAFEBuiltInMetricEnabled {
return
}
attrs, err := t.toOtelMetricAttrs(metricNameAFELatencies)
if err != nil {
return
}
t.instrumentAFELatencies.Record(t.ctx, float64(latency.Milliseconds()), metric.WithAttributes(attrs...))
}
func (t *builtinMetricsTracer) recordGFEError() {
attrs, err := t.toOtelMetricAttrs(metricNameGFEConnectivityErrorCount)
if err != nil {
return
}
t.instrumentGFEErrorCount.Add(t.ctx, 1, metric.WithAttributes(attrs...))
}
func (t *builtinMetricsTracer) recordAFEError() {
if !t.isAFEBuiltInMetricEnabled {
return
}
attrs, err := t.toOtelMetricAttrs(metricNameAFEConnectivityErrorCount)
if err != nil {
return
}
t.instrumentAFEErrorCount.Add(t.ctx, 1, metric.WithAttributes(attrs...))
}
// Convert error to grpc status error
func convertToGrpcStatusErr(err error) (codes.Code, error) {
if err == nil {
return codes.OK, nil
}
if errStatus, ok := status.FromError(err); ok {
return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message())
}
ctxStatus := status.FromContextError(err)
if ctxStatus.Code() != codes.Unknown {
return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message())
}
return codes.Unknown, err
}
// recordAttemptCompletion records as many attempt specific metrics as it can
// Ignore errors seen while creating metric attributes since metric can still
// be recorded with rest of the attributes
func recordAttemptCompletion(mt *builtinMetricsTracer) {
if !mt.builtInEnabled {
return
}
// capture AFE metrics only if direct-path is enabled and used in current attempt
if mt.currOp.currAttempt.directPathUsed {
if dur, ok := mt.currOp.currAttempt.serverTimingMetrics[afeTimingHeader]; ok {
mt.recordAFELatency(dur)
} else {
mt.recordAFEError()
}
} else {
if dur, ok := mt.currOp.currAttempt.serverTimingMetrics[gfeTimingHeader]; ok {
mt.recordGFELatency(dur)
} else {
mt.recordGFEError()
}
}
// Calculate elapsed time
elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime))
// Record attempt_latencies
attemptLatAttrs, err := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
if err != nil {
return
}
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...))
}
// recordOperationCompletion records as many operation specific metrics as it can
// Ignores error seen while creating metric attributes since metric can still
// be recorded with rest of the attributes
func recordOperationCompletion(mt *builtinMetricsTracer) {
if !mt.builtInEnabled {
return
}
// Calculate elapsed time
elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime))
// Record operation_count
opCntAttrs, err := mt.toOtelMetricAttrs(metricNameOperationCount)
if err != nil {
return
}
mt.instrumentOperationCount.Add(mt.ctx, 1, metric.WithAttributes(opCntAttrs...))
// Record operation_latencies
opLatAttrs, err := mt.toOtelMetricAttrs(metricNameOperationLatencies)
if err != nil {
return
}
mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(opLatAttrs...))
// Record attempt_count
attemptCntAttrs, err := mt.toOtelMetricAttrs(metricNameAttemptCount)
if err != nil {
return
}
mt.instrumentAttemptCount.Add(mt.ctx, mt.currOp.attemptCount, metric.WithAttributes(attemptCntAttrs...))
}
func convertToMs(d time.Duration) float64 {
return float64(d.Nanoseconds()) / float64(time.Millisecond)
}