blob: 0de66134dff4031cea3d68c0b95ca1720c0d46b7 [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package spanner
import (
"context"
"strconv"
"strings"
"sync"
"cloud.google.com/go/spanner/internal"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc/metadata"
)
const statsPrefix = "cloud.google.com/go/spanner/"
var (
tagKeyClientID = tag.MustNewKey("client_id")
tagKeyDatabase = tag.MustNewKey("database")
tagKeyInstance = tag.MustNewKey("instance_id")
tagKeyLibVersion = tag.MustNewKey("library_version")
tagKeyType = tag.MustNewKey("type")
tagCommonKeys = []tag.Key{tagKeyClientID, tagKeyDatabase, tagKeyInstance, tagKeyLibVersion}
tagNumInUseSessions = tag.Tag{Key: tagKeyType, Value: "num_in_use_sessions"}
tagNumBeingPrepared = tag.Tag{Key: tagKeyType, Value: "num_sessions_being_prepared"}
tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"}
tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"}
tagKeyMethod = tag.MustNewKey("grpc_client_method")
// gfeLatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
gfeLatencyMetricsEnabled = false
// mutex to avoid data race in reading/writing the above flag
statsMu = sync.RWMutex{}
)
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
stats.Record(ctx, m.M(n))
}
var (
// OpenSessionCount is a measure of the number of sessions currently opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
OpenSessionCount = stats.Int64(
statsPrefix+"open_session_count",
"Number of sessions currently opened",
stats.UnitDimensionless,
)
// OpenSessionCountView is a view of the last value of OpenSessionCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
OpenSessionCountView = &view.View{
Measure: OpenSessionCount,
Aggregation: view.LastValue(),
TagKeys: tagCommonKeys,
}
// MaxAllowedSessionsCount is a measure of the maximum number of sessions
// allowed. Configurable by the user.
MaxAllowedSessionsCount = stats.Int64(
statsPrefix+"max_allowed_sessions",
"The maximum number of sessions allowed. Configurable by the user.",
stats.UnitDimensionless,
)
// MaxAllowedSessionsCountView is a view of the last value of
// MaxAllowedSessionsCount.
MaxAllowedSessionsCountView = &view.View{
Measure: MaxAllowedSessionsCount,
Aggregation: view.LastValue(),
TagKeys: tagCommonKeys,
}
// SessionsCount is a measure of the number of sessions in the pool
// including both in-use, idle, and being prepared.
SessionsCount = stats.Int64(
statsPrefix+"num_sessions_in_pool",
"The number of sessions currently in use.",
stats.UnitDimensionless,
)
// SessionsCountView is a view of the last value of SessionsCount.
SessionsCountView = &view.View{
Measure: SessionsCount,
Aggregation: view.LastValue(),
TagKeys: append(tagCommonKeys, tagKeyType),
}
// MaxInUseSessionsCount is a measure of the maximum number of sessions
// in use during the last 10 minute interval.
MaxInUseSessionsCount = stats.Int64(
statsPrefix+"max_in_use_sessions",
"The maximum number of sessions in use during the last 10 minute interval.",
stats.UnitDimensionless,
)
// MaxInUseSessionsCountView is a view of the last value of
// MaxInUseSessionsCount.
MaxInUseSessionsCountView = &view.View{
Measure: MaxInUseSessionsCount,
Aggregation: view.LastValue(),
TagKeys: tagCommonKeys,
}
// GetSessionTimeoutsCount is a measure of the number of get sessions
// timeouts due to pool exhaustion.
GetSessionTimeoutsCount = stats.Int64(
statsPrefix+"get_session_timeouts",
"The number of get sessions timeouts due to pool exhaustion.",
stats.UnitDimensionless,
)
// GetSessionTimeoutsCountView is a view of the last value of
// GetSessionTimeoutsCount.
GetSessionTimeoutsCountView = &view.View{
Measure: GetSessionTimeoutsCount,
Aggregation: view.Count(),
TagKeys: tagCommonKeys,
}
// AcquiredSessionsCount is the number of sessions acquired from
// the session pool.
AcquiredSessionsCount = stats.Int64(
statsPrefix+"num_acquired_sessions",
"The number of sessions acquired from the session pool.",
stats.UnitDimensionless,
)
// AcquiredSessionsCountView is a view of the last value of
// AcquiredSessionsCount.
AcquiredSessionsCountView = &view.View{
Measure: AcquiredSessionsCount,
Aggregation: view.Count(),
TagKeys: tagCommonKeys,
}
// ReleasedSessionsCount is the number of sessions released by the user
// and pool maintainer.
ReleasedSessionsCount = stats.Int64(
statsPrefix+"num_released_sessions",
"The number of sessions released by the user and pool maintainer.",
stats.UnitDimensionless,
)
// ReleasedSessionsCountView is a view of the last value of
// ReleasedSessionsCount.
ReleasedSessionsCountView = &view.View{
Measure: ReleasedSessionsCount,
Aggregation: view.Count(),
TagKeys: tagCommonKeys,
}
// GFELatency is the latency between Google's network receiving an RPC and reading back the first byte of the response
GFELatency = stats.Int64(
statsPrefix+"gfe_latency",
"Latency between Google's network receiving an RPC and reading back the first byte of the response",
stats.UnitMilliseconds,
)
// GFELatencyView is the view of distribution of GFELatency values
GFELatencyView = &view.View{
Name: "cloud.google.com/go/spanner/gfe_latency",
Measure: GFELatency,
Description: "Latency between Google's network receives an RPC and reads back the first byte of the response",
Aggregation: view.Distribution(0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0,
16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0,
300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
100000.0),
TagKeys: append(tagCommonKeys, tagKeyMethod),
}
// GFEHeaderMissingCount is the number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network
GFEHeaderMissingCount = stats.Int64(
statsPrefix+"gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
stats.UnitDimensionless,
)
// GFEHeaderMissingCountView is the view of number of GFEHeaderMissingCount
GFEHeaderMissingCountView = &view.View{
Name: "cloud.google.com/go/spanner/gfe_header_missing_count",
Measure: GFEHeaderMissingCount,
Description: "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
Aggregation: view.Count(),
TagKeys: append(tagCommonKeys, tagKeyMethod),
}
)
// EnableStatViews enables all views of metrics relate to session management.
func EnableStatViews() error {
return view.Register(
OpenSessionCountView,
MaxAllowedSessionsCountView,
SessionsCountView,
MaxInUseSessionsCountView,
GetSessionTimeoutsCountView,
AcquiredSessionsCountView,
ReleasedSessionsCountView,
)
}
// EnableGfeLatencyView enables GFELatency metric
func EnableGfeLatencyView() error {
setGFELatencyMetricsFlag(true)
return view.Register(GFELatencyView)
}
// EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric
func EnableGfeHeaderMissingCountView() error {
setGFELatencyMetricsFlag(true)
return view.Register(GFEHeaderMissingCountView)
}
// EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric
func EnableGfeLatencyAndHeaderMissingCountViews() error {
setGFELatencyMetricsFlag(true)
return view.Register(
GFELatencyView,
GFEHeaderMissingCountView,
)
}
func getGFELatencyMetricsFlag() bool {
statsMu.RLock()
defer statsMu.RUnlock()
return gfeLatencyMetricsEnabled
}
func setGFELatencyMetricsFlag(enable bool) {
statsMu.Lock()
gfeLatencyMetricsEnabled = enable
statsMu.Unlock()
}
// DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric
func DisableGfeLatencyAndHeaderMissingCountViews() {
setGFELatencyMetricsFlag(false)
view.Unregister(
GFELatencyView,
GFEHeaderMissingCountView,
)
}
func captureGFELatencyStats(ctx context.Context, md metadata.MD, keyMethod string) error {
if len(md.Get("server-timing")) == 0 {
recordStat(ctx, GFEHeaderMissingCount, 1)
return nil
}
serverTiming := md.Get("server-timing")[0]
gfeLatency, err := strconv.Atoi(strings.TrimPrefix(serverTiming, "gfet4t7; dur="))
if !strings.HasPrefix(serverTiming, "gfet4t7; dur=") || err != nil {
return err
}
// Record GFE latency with OpenCensus.
ctx = tag.NewContext(ctx, tag.FromContext(ctx))
ctx, err = tag.New(ctx, tag.Insert(tagKeyMethod, keyMethod))
if err != nil {
return err
}
recordStat(ctx, GFELatency, int64(gfeLatency))
return nil
}
func createContextAndCaptureGFELatencyMetrics(ctx context.Context, ct *commonTags, md metadata.MD, keyMethod string) error {
var ctxGFE, err = tag.New(ctx,
tag.Upsert(tagKeyClientID, ct.clientID),
tag.Upsert(tagKeyDatabase, ct.database),
tag.Upsert(tagKeyInstance, ct.instance),
tag.Upsert(tagKeyLibVersion, ct.libVersion),
)
if err != nil {
return err
}
return captureGFELatencyStats(ctxGFE, md, keyMethod)
}
func getCommonTags(sc *sessionClient) *commonTags {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
return nil
}
return &commonTags{
clientID: sc.id,
database: database,
instance: instance,
libVersion: internal.Version,
}
}
// commonTags are common key-value pairs of data associated with the GFELatency measure
type commonTags struct {
// Client ID
clientID string
// Database Name
database string
// Instance ID
instance string
// Library Version
libVersion string
}