| /* |
| Copyright 2017 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package spanner |
| |
| import ( |
| "context" |
| "fmt" |
| "log" |
| "math/rand" |
| "strings" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/internal/trace" |
| "cloud.google.com/go/spanner/internal" |
| "go.opencensus.io/stats" |
| "go.opencensus.io/tag" |
| "go.opentelemetry.io/otel/attribute" |
| "go.opentelemetry.io/otel/metric" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| const ( |
| multiplexSessionRefreshInterval = 7 * 24 * time.Hour |
| ) |
| |
| // ActionOnInactiveTransactionKind describes the kind of action taken when there are inactive transactions. |
| // |
| // Deprecated: This type is no longer used as the session pool has been removed. |
| type ActionOnInactiveTransactionKind int |
| |
| const ( |
| actionUnspecified ActionOnInactiveTransactionKind = iota |
| // NoAction action does not perform any action on inactive transactions. |
| // |
| // Deprecated: This constant is no longer used as the session pool has been removed. |
| NoAction |
| // Warn action logs inactive transactions. Any inactive transaction gets logged only once. |
| // |
| // Deprecated: This constant is no longer used as the session pool has been removed. |
| Warn |
| // Close action closes inactive transactions without logging. |
| // |
| // Deprecated: This constant is no longer used as the session pool has been removed. |
| Close |
| // WarnAndClose action logs and closes the inactive transactions. |
| // |
| // Deprecated: This constant is no longer used as the session pool has been removed. |
| WarnAndClose |
| ) |
| |
| // InactiveTransactionRemovalOptions has configurations for action on long-running transactions. |
| // |
| // Deprecated: This type is no longer used as the session pool has been removed. |
| // Multiplexed sessions are now used for all operations. Kept for backward compatibility. |
| type InactiveTransactionRemovalOptions struct { |
| // ActionOnInactiveTransaction is the action to take on inactive transactions. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| ActionOnInactiveTransaction ActionOnInactiveTransactionKind |
| } |
| |
| // sessionHandle is an interface for transactions to access Cloud Spanner |
| // sessions safely. It is generated by sessionManager.takeMultiplexed(). |
| type sessionHandle struct { |
| mu sync.RWMutex |
| // session is a pointer to a session object. |
| session *session |
| // client is the RPC channel to Cloud Spanner. |
| client spannerClient |
| } |
| |
| // recycle marks the session handle as no longer in use. |
| func (sh *sessionHandle) recycle() { |
| sh.mu.Lock() |
| if sh.session == nil { |
| sh.mu.Unlock() |
| return |
| } |
| p := sh.session.sm |
| sh.session = nil |
| sh.client = nil |
| sh.mu.Unlock() |
| if p != nil { |
| p.mu.Lock() |
| p.decNumMultiplexedInUseLocked(context.Background()) |
| p.mu.Unlock() |
| } |
| } |
| |
| // getID gets the Cloud Spanner session ID from the internal session object. |
| func (sh *sessionHandle) getID() string { |
| sh.mu.RLock() |
| defer sh.mu.RUnlock() |
| if sh.session == nil { |
| return "" |
| } |
| return sh.session.getID() |
| } |
| |
| // getClient gets the Cloud Spanner RPC client associated with the session. |
| func (sh *sessionHandle) getClient() spannerClient { |
| sh.mu.RLock() |
| defer sh.mu.RUnlock() |
| if sh.session == nil { |
| return nil |
| } |
| if sh.client != nil { |
| return sh.client |
| } |
| return sh.session.client |
| } |
| |
| // getMetadata returns the metadata associated with the session. |
| func (sh *sessionHandle) getMetadata() metadata.MD { |
| sh.mu.RLock() |
| defer sh.mu.RUnlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.md |
| } |
| |
| // session wraps a Cloud Spanner session ID through which transactions are |
| // created and executed. All sessions are multiplexed sessions. |
| type session struct { |
| // client is the RPC channel to Cloud Spanner. |
| client spannerClient |
| // id is the unique id of the session in Cloud Spanner. |
| id string |
| sm *sessionManager |
| // createTime is the timestamp of the session's creation. |
| createTime time.Time |
| logger *log.Logger |
| // md is the Metadata to be sent with each request. |
| md metadata.MD |
| } |
| |
| // String implements fmt.Stringer for session. |
| func (s *session) String() string { |
| return fmt.Sprintf("<id=%v, create=%v>", s.id, s.createTime) |
| } |
| |
| // getID returns the session ID which uniquely identifies the session in Cloud Spanner. |
| func (s *session) getID() string { |
| return s.id |
| } |
| |
| // SessionPoolConfig stores configurations of a session pool. |
| // |
| // Deprecated: This configuration is no longer used as the session pool has been removed. |
| // Multiplexed sessions are now used for all operations. These options are kept for |
| // backward compatibility but are ignored. |
| type SessionPoolConfig struct { |
| // MaxOpened is the maximum number of opened sessions allowed by the session pool. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| MaxOpened uint64 |
| |
| // MinOpened is the minimum number of opened sessions that the session pool tries to maintain. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| MinOpened uint64 |
| |
| // MaxIdle is the maximum number of idle sessions. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| MaxIdle uint64 |
| |
| // MaxBurst is the maximum number of concurrent session creation requests. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| MaxBurst uint64 |
| |
| // WriteSessions is the fraction of sessions we try to keep prepared for write. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| WriteSessions float64 |
| |
| // HealthCheckWorkers is number of workers used by health checker. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| HealthCheckWorkers int |
| |
| // HealthCheckInterval is how often the health checker pings a session. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| HealthCheckInterval time.Duration |
| |
| // MultiplexSessionCheckInterval is the interval at which the multiplexed session is checked. |
| // |
| // Defaults to 10 mins. |
| MultiplexSessionCheckInterval time.Duration |
| |
| // TrackSessionHandles determines whether the session pool will keep track of session handles. |
| // |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| TrackSessionHandles bool |
| |
| // Deprecated: This option is no longer used as the session pool has been removed. |
| InactiveTransactionRemovalOptions |
| } |
| |
| // DefaultSessionPoolConfig is the default configuration. |
| // |
| // Deprecated: The session pool has been removed. Multiplexed sessions are now used |
| // for all operations. Only MultiplexSessionCheckInterval is still active. |
| var DefaultSessionPoolConfig = SessionPoolConfig{ |
| MultiplexSessionCheckInterval: 10 * time.Minute, |
| } |
| |
| type muxSessionCreateRequest struct { |
| ctx context.Context |
| force bool |
| } |
| |
| // sessionManager manages multiplexed sessions for a database. |
| type sessionManager struct { |
| mu sync.Mutex |
| valid bool |
| sc *sessionClient |
| |
| multiplexSessionClientCounter int |
| clientPool []spannerClient |
| multiplexedSession *session |
| multiplexedSessionReq chan muxSessionCreateRequest |
| mayGetMultiplexedSession chan bool |
| multiplexedSessionCreationError error |
| |
| // locationRouter is set when the experimental location API is enabled. |
| // It is used to wrap round-robin clients with location-aware routing. |
| locationRouter *locationRouter |
| |
| // SessionPoolConfig is kept for backward compatibility. |
| SessionPoolConfig |
| |
| rand *rand.Rand |
| tagMap *tag.Map |
| otConfig *openTelemetryConfig |
| done chan struct{} |
| once sync.Once |
| } |
| |
| // newSessionManager creates a new sessionManager for multiplexed sessions. |
| func newSessionManager(sc *sessionClient, config SessionPoolConfig) (*sessionManager, error) { |
| if config.MultiplexSessionCheckInterval == 0 { |
| config.MultiplexSessionCheckInterval = 10 * time.Minute |
| } |
| |
| sm := &sessionManager{ |
| sc: sc, |
| valid: true, |
| mayGetMultiplexedSession: make(chan bool), |
| multiplexedSessionReq: make(chan muxSessionCreateRequest), |
| SessionPoolConfig: config, |
| rand: rand.New(rand.NewSource(time.Now().UnixNano())), |
| otConfig: sc.otConfig, |
| done: make(chan struct{}), |
| } |
| |
| _, instance, database, err := parseDatabaseName(sc.database) |
| if err != nil { |
| return nil, err |
| } |
| ctx, err := tag.New(context.Background(), |
| tag.Upsert(tagKeyClientID, sc.id), |
| tag.Upsert(tagKeyDatabase, database), |
| tag.Upsert(tagKeyInstance, instance), |
| tag.Upsert(tagKeyLibVersion, internal.Version), |
| ) |
| if err != nil { |
| logf(sm.sc.logger, "Failed to create tag map: %v", err) |
| } |
| sm.tagMap = tag.FromContext(ctx) |
| |
| // Start the multiplexed session creation goroutine |
| go sm.createMultiplexedSession() |
| |
| // Create the initial multiplexed session |
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| sm.multiplexedSessionReq <- muxSessionCreateRequest{force: true, ctx: ctx} |
| go func() { |
| select { |
| case <-ctx.Done(): |
| cancel() |
| return |
| case <-sm.mayGetMultiplexedSession: |
| cancel() |
| } |
| }() |
| |
| // Start the multiplexed session refresh worker |
| go sm.multiplexSessionWorker() |
| |
| err = registerSessionManagerOTMetrics(sm) |
| if err != nil { |
| logf(sm.sc.logger, "Error registering session metrics in OpenTelemetry: %v", err) |
| } |
| |
| return sm, nil |
| } |
| |
| func (p *sessionManager) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) { |
| ctx = tag.NewContext(ctx, p.tagMap) |
| mutators := make([]tag.Mutator, len(tags)) |
| for i, t := range tags { |
| mutators[i] = tag.Upsert(t.Key, t.Value) |
| } |
| ctx, err := tag.New(ctx, mutators...) |
| if err != nil { |
| logf(p.sc.logger, "Failed to tag metrics, error: %v", err) |
| } |
| recordStat(ctx, m, n) |
| } |
| |
| type recordOTStatOption struct { |
| attr []attribute.KeyValue |
| } |
| |
| func (p *sessionManager) recordOTStat(ctx context.Context, m metric.Int64Counter, val int64, option recordOTStatOption) { |
| if m != nil { |
| attrs := p.otConfig.attributeMap |
| if len(option.attr) > 0 { |
| attrs = option.attr |
| } |
| m.Add(ctx, val, metric.WithAttributes(attrs...)) |
| } |
| } |
| |
| func (p *sessionManager) createMultiplexedSession() { |
| for c := range p.multiplexedSessionReq { |
| p.mu.Lock() |
| sess := p.multiplexedSession |
| p.mu.Unlock() |
| if c.force || sess == nil { |
| p.mu.Lock() |
| p.sc.mu.Lock() |
| client, err := p.sc.nextClient() |
| p.sc.mu.Unlock() |
| p.mu.Unlock() |
| if err != nil { |
| p.mu.Lock() |
| p.multiplexedSessionCreationError = err |
| p.mu.Unlock() |
| select { |
| case p.mayGetMultiplexedSession <- true: |
| case <-c.ctx.Done(): |
| } |
| continue |
| } |
| p.sc.executeCreateMultiplexedSession(c.ctx, client, p.sc.md, p) |
| continue |
| } |
| select { |
| case p.mayGetMultiplexedSession <- true: |
| case <-c.ctx.Done(): |
| return |
| } |
| } |
| } |
| |
| // sessionReady is called when a session has been created and is ready for use. |
| func (p *sessionManager) sessionReady(ctx context.Context, s *session) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| s.sm = p |
| p.multiplexedSession = s |
| p.multiplexedSessionCreationError = nil |
| p.recordStat(context.Background(), OpenSessionCount, int64(1), tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| select { |
| case p.mayGetMultiplexedSession <- true: |
| case <-ctx.Done(): |
| } |
| } |
| |
| // sessionCreationFailed is called when session creation fails. |
| func (p *sessionManager) sessionCreationFailed(ctx context.Context, err error) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.multiplexedSession != nil { |
| p.multiplexedSessionCreationError = nil |
| select { |
| case p.mayGetMultiplexedSession <- true: |
| case <-ctx.Done(): |
| return |
| } |
| return |
| } |
| p.recordStat(context.Background(), OpenSessionCount, int64(0), tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| p.multiplexedSessionCreationError = err |
| select { |
| case p.mayGetMultiplexedSession <- true: |
| case <-ctx.Done(): |
| return |
| } |
| } |
| |
| // isValid checks if the session pool is still valid. |
| func (p *sessionManager) isValid() bool { |
| if p == nil { |
| return false |
| } |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| return p.valid |
| } |
| |
| // close marks the session as closed. |
| func (p *sessionManager) close(ctx context.Context) { |
| if p == nil { |
| return |
| } |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return |
| } |
| p.valid = false |
| if p.otConfig != nil && p.otConfig.otMetricRegistration != nil { |
| err := p.otConfig.otMetricRegistration.Unregister() |
| if err != nil { |
| logf(p.sc.logger, "Failed to unregister callback from the OpenTelemetry meter, error : %v", err) |
| } |
| } |
| p.mu.Unlock() |
| p.once.Do(func() { close(p.done) }) |
| close(p.multiplexedSessionReq) |
| } |
| |
| // errInvalidSession is the error for using an invalid session. |
| var errInvalidSession = spannerErrorf(codes.InvalidArgument, "invalid session") |
| |
| // newSessionHandle creates a new session handle for the given session. |
| func (p *sessionManager) newSessionHandle(s *session) (sh *sessionHandle) { |
| sh = &sessionHandle{session: s} |
| p.mu.Lock() |
| client := p.getRoundRobinClient() |
| if p.locationRouter != nil && p.locationRouter.endpointCache != nil { |
| client = newLocationAwareSpannerClient(client, p.locationRouter, p.locationRouter.endpointCache) |
| } |
| sh.client = client |
| p.mu.Unlock() |
| return sh |
| } |
| |
| func (p *sessionManager) getRoundRobinClient() spannerClient { |
| p.sc.mu.Lock() |
| defer func() { |
| p.multiplexSessionClientCounter++ |
| p.sc.mu.Unlock() |
| }() |
| if len(p.clientPool) == 0 { |
| p.clientPool = make([]spannerClient, p.sc.connPool.Num()) |
| for i := 0; i < p.sc.connPool.Num(); i++ { |
| c, err := p.sc.nextClient() |
| if err != nil { |
| return nil |
| } |
| p.clientPool[i] = c |
| } |
| } |
| p.multiplexSessionClientCounter = p.multiplexSessionClientCounter % len(p.clientPool) |
| return p.clientPool[p.multiplexSessionClientCounter] |
| } |
| |
| // errGetSessionTimeout returns error for context timeout during session acquisition. |
| func (p *sessionManager) errGetSessionTimeout(ctx context.Context) error { |
| var code codes.Code |
| if ctx.Err() == context.DeadlineExceeded { |
| code = codes.DeadlineExceeded |
| } else { |
| code = codes.Canceled |
| } |
| return spannerErrorf(code, "timeout / context canceled during getting session.") |
| } |
| |
| // takeMultiplexed returns a multiplexed session. |
| func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, error) { |
| trace.TracePrintf(ctx, nil, "Acquiring a multiplexed session") |
| for { |
| var s *session |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return nil, errInvalidSession |
| } |
| if p.multiplexedSession != nil { |
| s = p.multiplexedSession |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Acquired multiplexed session") |
| p.mu.Unlock() |
| p.incNumMultiplexedInUse(ctx) |
| return p.newSessionHandle(s), nil |
| } |
| mayGetSession := p.mayGetMultiplexedSession |
| p.mu.Unlock() |
| p.multiplexedSessionReq <- muxSessionCreateRequest{force: false, ctx: ctx} |
| select { |
| case <-ctx.Done(): |
| trace.TracePrintf(ctx, nil, "Context done waiting for multiplexed session") |
| p.recordStat(ctx, GetSessionTimeoutsCount, 1, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| if p.otConfig != nil { |
| p.recordOTStat(ctx, p.otConfig.getSessionTimeoutsCount, 1, recordOTStatOption{attr: p.otConfig.attributeMapWithMultiplexed}) |
| } |
| return nil, p.errGetSessionTimeout(ctx) |
| case <-mayGetSession: |
| p.mu.Lock() |
| if p.multiplexedSessionCreationError != nil { |
| trace.TracePrintf(ctx, nil, "Error creating multiplexed session: %v", p.multiplexedSessionCreationError) |
| err := p.multiplexedSessionCreationError |
| p.mu.Unlock() |
| return nil, err |
| } |
| p.mu.Unlock() |
| } |
| } |
| } |
| |
| func (p *sessionManager) incNumMultiplexedInUse(ctx context.Context) { |
| p.recordStat(ctx, AcquiredSessionsCount, 1, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| if p.otConfig != nil { |
| p.recordOTStat(ctx, p.otConfig.acquiredSessionsCount, 1, recordOTStatOption{attr: p.otConfig.attributeMapWithMultiplexed}) |
| } |
| } |
| |
| func (p *sessionManager) decNumMultiplexedInUseLocked(ctx context.Context) { |
| p.recordStat(ctx, ReleasedSessionsCount, 1, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) |
| if p.otConfig != nil { |
| p.recordOTStat(ctx, p.otConfig.releasedSessionsCount, 1, recordOTStatOption{attr: p.otConfig.attributeMapWithMultiplexed}) |
| } |
| } |
| |
| func (p *sessionManager) multiplexSessionWorker() { |
| for { |
| select { |
| case <-p.done: |
| return |
| default: |
| } |
| |
| p.mu.Lock() |
| createTime := time.Now() |
| s := p.multiplexedSession |
| if s != nil { |
| createTime = p.multiplexedSession.createTime |
| } |
| p.mu.Unlock() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| if createTime.Add(multiplexSessionRefreshInterval).Before(time.Now()) { |
| // Multiplexed session is idle for more than 7 days, replace it. |
| p.multiplexedSessionReq <- muxSessionCreateRequest{force: true, ctx: ctx} |
| // wait for the new multiplexed session to be created. |
| select { |
| case <-p.mayGetMultiplexedSession: |
| case <-p.done: |
| cancel() |
| return |
| } |
| } |
| cancel() |
| |
| // Sleep for a while to avoid burning CPU. |
| select { |
| case <-time.After(p.MultiplexSessionCheckInterval): |
| case <-p.done: |
| return |
| } |
| } |
| } |
| |
| // sessionResourceType is the type name of Spanner sessions. |
| const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" |
| |
| func isFailedInlineBeginTransaction(err error) bool { |
| if err == nil { |
| return false |
| } |
| return ErrCode(err) == codes.Internal && strings.Contains(err.Error(), errInlineBeginTransactionFailedMsg) |
| } |