| /* |
| Copyright 2017 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package spanner |
| |
| import ( |
| "container/heap" |
| "container/list" |
| "context" |
| "fmt" |
| "log" |
| "math" |
| "math/rand" |
| "runtime/debug" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/internal/trace" |
| "cloud.google.com/go/internal/version" |
| vkit "cloud.google.com/go/spanner/apiv1" |
| "go.opencensus.io/stats" |
| "go.opencensus.io/tag" |
| octrace "go.opencensus.io/trace" |
| sppb "google.golang.org/genproto/googleapis/spanner/v1" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| const healthCheckIntervalMins = 50 |
| |
| // sessionHandle is an interface for transactions to access Cloud Spanner |
| // sessions safely. It is generated by sessionPool.take(). |
| type sessionHandle struct { |
| // mu guarantees that the inner session object is returned / destroyed only |
| // once. |
| mu sync.Mutex |
| // session is a pointer to a session object. Transactions never need to |
| // access it directly. |
| session *session |
| // checkoutTime is the time the session was checked out of the pool. |
| checkoutTime time.Time |
| // trackedSessionHandle is the linked list node which links the session to |
| // the list of tracked session handles. trackedSessionHandle is only set if |
| // TrackSessionHandles has been enabled in the session pool configuration. |
| trackedSessionHandle *list.Element |
| // stack is the call stack of the goroutine that checked out the session |
| // from the pool. This can be used to track down session leak problems. |
| stack []byte |
| } |
| |
| // recycle gives the inner session object back to its home session pool. It is |
| // safe to call recycle multiple times but only the first one would take effect. |
| func (sh *sessionHandle) recycle() { |
| sh.mu.Lock() |
| if sh.session == nil { |
| // sessionHandle has already been recycled. |
| sh.mu.Unlock() |
| return |
| } |
| p := sh.session.pool |
| tracked := sh.trackedSessionHandle |
| sh.session.recycle() |
| sh.session = nil |
| sh.trackedSessionHandle = nil |
| sh.checkoutTime = time.Time{} |
| sh.stack = nil |
| sh.mu.Unlock() |
| if tracked != nil { |
| p.mu.Lock() |
| p.trackedSessionHandles.Remove(tracked) |
| p.mu.Unlock() |
| } |
| } |
| |
| // getID gets the Cloud Spanner session ID from the internal session object. |
| // getID returns empty string if the sessionHandle is nil or the inner session |
| // object has been released by recycle / destroy. |
| func (sh *sessionHandle) getID() string { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| // sessionHandle has already been recycled/destroyed. |
| return "" |
| } |
| return sh.session.getID() |
| } |
| |
| // getClient gets the Cloud Spanner RPC client associated with the session ID |
| // in sessionHandle. |
| func (sh *sessionHandle) getClient() *vkit.Client { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.client |
| } |
| |
| // getMetadata returns the metadata associated with the session in sessionHandle. |
| func (sh *sessionHandle) getMetadata() metadata.MD { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.md |
| } |
| |
| // getTransactionID returns the transaction id in the session if available. |
| func (sh *sessionHandle) getTransactionID() transactionID { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.tx |
| } |
| |
| // destroy destroys the inner session object. It is safe to call destroy |
| // multiple times and only the first call would attempt to |
| // destroy the inner session object. |
| func (sh *sessionHandle) destroy() { |
| sh.mu.Lock() |
| s := sh.session |
| if s == nil { |
| // sessionHandle has already been recycled. |
| sh.mu.Unlock() |
| return |
| } |
| tracked := sh.trackedSessionHandle |
| sh.session = nil |
| sh.trackedSessionHandle = nil |
| sh.checkoutTime = time.Time{} |
| sh.stack = nil |
| sh.mu.Unlock() |
| |
| if tracked != nil { |
| p := s.pool |
| p.mu.Lock() |
| p.trackedSessionHandles.Remove(tracked) |
| p.mu.Unlock() |
| } |
| s.destroy(false) |
| } |
| |
| // session wraps a Cloud Spanner session ID through which transactions are |
| // created and executed. |
| type session struct { |
| // client is the RPC channel to Cloud Spanner. It is set only once during |
| // session's creation. |
| client *vkit.Client |
| // id is the unique id of the session in Cloud Spanner. It is set only once |
| // during session's creation. |
| id string |
| // pool is the session's home session pool where it was created. It is set |
| // only once during session's creation. |
| pool *sessionPool |
| // createTime is the timestamp of the session's creation. It is set only |
| // once during session's creation. |
| createTime time.Time |
| // logger is the logger configured for the Spanner client that created the |
| // session. If nil, logging will be directed to the standard logger. |
| logger *log.Logger |
| |
| // mu protects the following fields from concurrent access: both |
| // healthcheck workers and transactions can modify them. |
| mu sync.Mutex |
| // valid marks the validity of a session. |
| valid bool |
| // hcIndex is the index of the session inside the global healthcheck queue. |
| // If hcIndex < 0, session has been unregistered from the queue. |
| hcIndex int |
| // idleList is the linkedlist node which links the session to its home |
| // session pool's idle list. If idleList == nil, the |
| // session is not in idle list. |
| idleList *list.Element |
| // nextCheck is the timestamp of next scheduled healthcheck of the session. |
| // It is maintained by the global health checker. |
| nextCheck time.Time |
| // checkingHelath is true if currently this session is being processed by |
| // health checker. Must be modified under health checker lock. |
| checkingHealth bool |
| // md is the Metadata to be sent with each request. |
| md metadata.MD |
| // tx contains the transaction id if the session has been prepared for |
| // write. |
| tx transactionID |
| // firstHCDone indicates whether the first health check is done or not. |
| firstHCDone bool |
| } |
| |
| // isValid returns true if the session is still valid for use. |
| func (s *session) isValid() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.valid |
| } |
| |
| // isWritePrepared returns true if the session is prepared for write. |
| func (s *session) isWritePrepared() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.tx != nil |
| } |
| |
| // String implements fmt.Stringer for session. |
| func (s *session) String() string { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", |
| s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) |
| } |
| |
| // ping verifies if the session is still alive in Cloud Spanner. |
| func (s *session) ping() error { |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| defer cancel() |
| |
| // Start parent span that doesn't record. |
| _, span := octrace.StartSpan(ctx, "cloud.google.com/go/spanner.ping", octrace.WithSampler(octrace.NeverSample())) |
| defer span.End() |
| |
| // s.getID is safe even when s is invalid. |
| _, err := s.client.ExecuteSql(contextWithOutgoingMetadata(ctx, s.md), &sppb.ExecuteSqlRequest{ |
| Session: s.getID(), |
| Sql: "SELECT 1", |
| }) |
| return err |
| } |
| |
| // setHcIndex atomically sets the session's index in the healthcheck queue and |
| // returns the old index. |
| func (s *session) setHcIndex(i int) int { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| oi := s.hcIndex |
| s.hcIndex = i |
| return oi |
| } |
| |
| // setIdleList atomically sets the session's idle list link and returns the old |
| // link. |
| func (s *session) setIdleList(le *list.Element) *list.Element { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| old := s.idleList |
| s.idleList = le |
| return old |
| } |
| |
| // invalidate marks a session as invalid and returns the old validity. |
| func (s *session) invalidate() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| ov := s.valid |
| s.valid = false |
| return ov |
| } |
| |
| // setNextCheck sets the timestamp for next healthcheck on the session. |
| func (s *session) setNextCheck(t time.Time) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| s.nextCheck = t |
| } |
| |
| // setTransactionID sets the transaction id in the session |
| func (s *session) setTransactionID(tx transactionID) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| s.tx = tx |
| } |
| |
| // getID returns the session ID which uniquely identifies the session in Cloud |
| // Spanner. |
| func (s *session) getID() string { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.id |
| } |
| |
| // getHcIndex returns the session's index into the global healthcheck priority |
| // queue. |
| func (s *session) getHcIndex() int { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.hcIndex |
| } |
| |
| // getIdleList returns the session's link in its home session pool's idle list. |
| func (s *session) getIdleList() *list.Element { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.idleList |
| } |
| |
| // getNextCheck returns the timestamp for next healthcheck on the session. |
| func (s *session) getNextCheck() time.Time { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.nextCheck |
| } |
| |
| // recycle turns the session back to its home session pool. |
| func (s *session) recycle() { |
| s.setTransactionID(nil) |
| if !s.pool.recycle(s) { |
| // s is rejected by its home session pool because it expired and the |
| // session pool currently has enough open sessions. |
| s.destroy(false) |
| } |
| s.pool.decNumInUse(context.Background()) |
| } |
| |
| // destroy removes the session from its home session pool, healthcheck queue |
| // and Cloud Spanner service. |
| func (s *session) destroy(isExpire bool) bool { |
| // Remove s from session pool. |
| if !s.pool.remove(s, isExpire) { |
| return false |
| } |
| // Unregister s from healthcheck queue. |
| s.pool.hc.unregister(s) |
| // Remove s from Cloud Spanner service. |
| ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) |
| defer cancel() |
| s.delete(ctx) |
| return true |
| } |
| |
| func (s *session) delete(ctx context.Context) { |
| // Ignore the error because even if we fail to explicitly destroy the |
| // session, it will be eventually garbage collected by Cloud Spanner. |
| err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()}) |
| if err != nil { |
| logf(s.logger, "Failed to delete session %v. Error: %v", s.getID(), err) |
| } |
| } |
| |
| // prepareForWrite prepares the session for write if it is not already in that |
| // state. |
| func (s *session) prepareForWrite(ctx context.Context) error { |
| if s.isWritePrepared() { |
| return nil |
| } |
| tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) |
| // Session not found should cause the session to be removed from the pool. |
| if isSessionNotFoundError(err) { |
| s.pool.remove(s, false) |
| s.pool.hc.unregister(s) |
| return err |
| } |
| // Enable/disable background preparing of write sessions depending on |
| // whether the BeginTransaction call succeeded. This will prevent the |
| // session pool workers from going into an infinite loop of trying to |
| // prepare sessions. Any subsequent successful BeginTransaction call from |
| // for example takeWriteSession will re-enable the background process. |
| s.pool.mu.Lock() |
| s.pool.disableBackgroundPrepareSessions = err != nil |
| s.pool.mu.Unlock() |
| if err != nil { |
| return err |
| } |
| s.setTransactionID(tx) |
| return nil |
| } |
| |
| // SessionPoolConfig stores configurations of a session pool. |
| type SessionPoolConfig struct { |
| // MaxOpened is the maximum number of opened sessions allowed by the session |
| // pool. If the client tries to open a session and there are already |
| // MaxOpened sessions, it will block until one becomes available or the |
| // context passed to the client method is canceled or times out. |
| // |
| // Defaults to NumChannels * 100. |
| MaxOpened uint64 |
| |
| // MinOpened is the minimum number of opened sessions that the session pool |
| // tries to maintain. Session pool won't continue to expire sessions if |
| // number of opened connections drops below MinOpened. However, if a session |
| // is found to be broken, it will still be evicted from the session pool, |
| // therefore it is posssible that the number of opened sessions drops below |
| // MinOpened. |
| // |
| // Defaults to 100. |
| MinOpened uint64 |
| |
| // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. |
| // |
| // Defaults to 0. |
| MaxIdle uint64 |
| |
| // MaxBurst is the maximum number of concurrent session creation requests. |
| // |
| // Defaults to 10. |
| MaxBurst uint64 |
| |
| // incStep is the number of sessions to create in one batch when at least |
| // one more session is needed. |
| // |
| // Defaults to 25. |
| incStep uint64 |
| |
| // WriteSessions is the fraction of sessions we try to keep prepared for |
| // write. |
| // |
| // Defaults to 0.2. |
| WriteSessions float64 |
| |
| // HealthCheckWorkers is number of workers used by health checker for this |
| // pool. |
| // |
| // Defaults to 10. |
| HealthCheckWorkers int |
| |
| // HealthCheckInterval is how often the health checker pings a session. |
| // |
| // Defaults to 5m. |
| HealthCheckInterval time.Duration |
| |
| // TrackSessionHandles determines whether the session pool will keep track |
| // of the stacktrace of the goroutines that take sessions from the pool. |
| // This setting can be used to track down session leak problems. |
| // |
| // Defaults to false. |
| TrackSessionHandles bool |
| |
| // healthCheckSampleInterval is how often the health checker samples live |
| // session (for use in maintaining session pool size). |
| // |
| // Defaults to 1m. |
| healthCheckSampleInterval time.Duration |
| |
| // sessionLabels for the sessions created in the session pool. |
| sessionLabels map[string]string |
| } |
| |
| // DefaultSessionPoolConfig is the default configuration for the session pool |
| // that will be used for a Spanner client, unless the user supplies a specific |
| // session pool config. |
| var DefaultSessionPoolConfig = SessionPoolConfig{ |
| MinOpened: 100, |
| MaxOpened: numChannels * 100, |
| MaxBurst: 10, |
| incStep: 25, |
| WriteSessions: 0.2, |
| HealthCheckWorkers: 10, |
| HealthCheckInterval: healthCheckIntervalMins * time.Minute, |
| } |
| |
| // errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. |
| func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) |
| } |
| |
| // errWriteFractionOutOfRange returns error for |
| // SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 |
| func errWriteFractionOutOfRange(writeFraction float64) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) |
| } |
| |
| // errHealthCheckWorkersNegative returns error for |
| // SessionPoolConfig.HealthCheckWorkers < 0 |
| func errHealthCheckWorkersNegative(workers int) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) |
| } |
| |
| // errHealthCheckIntervalNegative returns error for |
| // SessionPoolConfig.HealthCheckInterval < 0 |
| func errHealthCheckIntervalNegative(interval time.Duration) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) |
| } |
| |
| // validate verifies that the SessionPoolConfig is good for use. |
| func (spc *SessionPoolConfig) validate() error { |
| if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { |
| return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) |
| } |
| if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { |
| return errWriteFractionOutOfRange(spc.WriteSessions) |
| } |
| if spc.HealthCheckWorkers < 0 { |
| return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) |
| } |
| if spc.HealthCheckInterval < 0 { |
| return errHealthCheckIntervalNegative(spc.HealthCheckInterval) |
| } |
| return nil |
| } |
| |
| // sessionPool creates and caches Cloud Spanner sessions. |
| type sessionPool struct { |
| // mu protects sessionPool from concurrent access. |
| mu sync.Mutex |
| // valid marks the validity of the session pool. |
| valid bool |
| // sc is used to create the sessions for the pool. |
| sc *sessionClient |
| // trackedSessionHandles contains all sessions handles that have been |
| // checked out of the pool. The list is only filled if TrackSessionHandles |
| // has been enabled. |
| trackedSessionHandles list.List |
| // idleList caches idle session IDs. Session IDs in this list can be |
| // allocated for use. |
| idleList list.List |
| // idleWriteList caches idle sessions which have been prepared for write. |
| idleWriteList list.List |
| // mayGetSession is for broadcasting that session retrival/creation may |
| // proceed. |
| mayGetSession chan struct{} |
| // sessionCreationError is the last error that occurred during session |
| // creation and is propagated to any waiters waiting for a session. |
| sessionCreationError error |
| // numOpened is the total number of open sessions from the session pool. |
| numOpened uint64 |
| // createReqs is the number of ongoing session creation requests. |
| createReqs uint64 |
| // prepareReqs is the number of ongoing session preparation request. |
| prepareReqs uint64 |
| // numReadWaiters is the number of processes waiting for a read session to |
| // become available. |
| numReadWaiters uint64 |
| // numWriteWaiters is the number of processes waiting for a write session |
| // to become available. |
| numWriteWaiters uint64 |
| // disableBackgroundPrepareSessions indicates that the BeginTransaction |
| // call for a read/write transaction failed with a permanent error, such as |
| // PermissionDenied or `Database not found`. Further background calls to |
| // prepare sessions will be disabled. |
| disableBackgroundPrepareSessions bool |
| // configuration of the session pool. |
| SessionPoolConfig |
| // hc is the health checker |
| hc *healthChecker |
| // rand is a separately sourced random generator. |
| rand *rand.Rand |
| // numInUse is the number of sessions that are currently in use (checked out |
| // from the session pool). |
| numInUse uint64 |
| // maxNumInUse is the maximum number of sessions in use concurrently in the |
| // current 10 minute interval. |
| maxNumInUse uint64 |
| // lastResetTime is the start time of the window for recording maxNumInUse. |
| lastResetTime time.Time |
| // numReads is the number of sessions that are idle for reads. |
| numReads uint64 |
| // numWrites is the number of sessions that are idle for writes. |
| numWrites uint64 |
| |
| // mw is the maintenance window containing statistics for the max number of |
| // sessions checked out of the pool during the last 10 minutes. |
| mw *maintenanceWindow |
| |
| // tagMap is a map of all tags that are associated with the emitted metrics. |
| tagMap *tag.Map |
| } |
| |
| // newSessionPool creates a new session pool. |
| func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { |
| if err := config.validate(); err != nil { |
| return nil, err |
| } |
| pool := &sessionPool{ |
| sc: sc, |
| valid: true, |
| mayGetSession: make(chan struct{}), |
| SessionPoolConfig: config, |
| mw: newMaintenanceWindow(config.MaxOpened), |
| rand: rand.New(rand.NewSource(time.Now().UnixNano())), |
| } |
| if config.HealthCheckWorkers == 0 { |
| // With 10 workers and assuming average latency of 5ms for |
| // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. |
| // If the rate of takeWriteSession is more than that, it will degrade to |
| // doing BeginTransaction inline. |
| // |
| // TODO: consider resizing the worker pool dynamically according to the load. |
| config.HealthCheckWorkers = 10 |
| } |
| if config.HealthCheckInterval == 0 { |
| config.HealthCheckInterval = healthCheckIntervalMins * time.Minute |
| } |
| if config.healthCheckSampleInterval == 0 { |
| config.healthCheckSampleInterval = time.Minute |
| } |
| |
| _, instance, database, err := parseDatabaseName(sc.database) |
| if err != nil { |
| return nil, err |
| } |
| // Errors should not prevent initializing the session pool. |
| ctx, err := tag.New(context.Background(), |
| tag.Upsert(tagKeyClientID, sc.id), |
| tag.Upsert(tagKeyDatabase, database), |
| tag.Upsert(tagKeyInstance, instance), |
| tag.Upsert(tagKeyLibVersion, version.Repo), |
| ) |
| if err != nil { |
| logf(pool.sc.logger, "Failed to create tag map, error: %v", err) |
| } |
| pool.tagMap = tag.FromContext(ctx) |
| |
| // On GCE VM, within the same region an healthcheck ping takes on average |
| // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a |
| // healthChecker can effectively mantain |
| // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. |
| pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) |
| |
| // First initialize the pool before we indicate that the healthchecker is |
| // ready. This prevents the maintainer from starting before the pool has |
| // been initialized, which means that we guarantee that the initial |
| // sessions are created using BatchCreateSessions. |
| if config.MinOpened > 0 { |
| numSessions := minUint64(config.MinOpened, math.MaxInt32) |
| if err := pool.initPool(numSessions); err != nil { |
| return nil, err |
| } |
| } |
| pool.recordStat(context.Background(), MaxAllowedSessionsCount, int64(config.MaxOpened)) |
| close(pool.hc.ready) |
| return pool, nil |
| } |
| |
| func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n int64, tags ...tag.Tag) { |
| ctx = tag.NewContext(ctx, p.tagMap) |
| mutators := make([]tag.Mutator, len(tags)) |
| for i, t := range tags { |
| mutators[i] = tag.Upsert(t.Key, t.Value) |
| } |
| ctx, err := tag.New(ctx, mutators...) |
| if err != nil { |
| logf(p.sc.logger, "Failed to tag metrics, error: %v", err) |
| } |
| recordStat(ctx, m, n) |
| } |
| |
| func (p *sessionPool) initPool(numSessions uint64) error { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| return p.growPoolLocked(numSessions, true) |
| } |
| |
| func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels bool) error { |
| // Take budget before the actual session creation. |
| numSessions = minUint64(numSessions, math.MaxInt32) |
| p.numOpened += uint64(numSessions) |
| p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) |
| p.createReqs += uint64(numSessions) |
| // Asynchronously create a batch of sessions for the pool. |
| return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) |
| } |
| |
| // sessionReady is executed by the SessionClient when a session has been |
| // created and is ready to use. This method will add the new session to the |
| // pool and decrease the number of sessions that is being created. |
| func (p *sessionPool) sessionReady(s *session) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| // Clear any session creation error. |
| p.sessionCreationError = nil |
| // Set this pool as the home pool of the session and register it with the |
| // health checker. |
| s.pool = p |
| p.hc.register(s) |
| p.createReqs-- |
| // Insert the session at a random position in the pool to prevent all |
| // sessions affiliated with a channel to be placed at sequentially in the |
| // pool. |
| if p.idleList.Len() > 0 { |
| pos := rand.Intn(p.idleList.Len()) |
| before := p.idleList.Front() |
| for i := 0; i < pos; i++ { |
| before = before.Next() |
| } |
| s.setIdleList(p.idleList.InsertBefore(s, before)) |
| } else { |
| s.setIdleList(p.idleList.PushBack(s)) |
| } |
| p.incNumReadsLocked(context.Background()) |
| // Notify other waiters blocking on session creation. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| } |
| |
| // sessionCreationFailed is called by the SessionClient when the creation of one |
| // or more requested sessions finished with an error. sessionCreationFailed will |
| // decrease the number of sessions being created and notify any waiters that |
| // the session creation failed. |
| func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| p.createReqs -= uint64(numSessions) |
| p.numOpened -= uint64(numSessions) |
| p.recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) |
| // Notify other waiters blocking on session creation. |
| p.sessionCreationError = err |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| } |
| |
| // isValid checks if the session pool is still valid. |
| func (p *sessionPool) isValid() bool { |
| if p == nil { |
| return false |
| } |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| return p.valid |
| } |
| |
| // close marks the session pool as closed. |
| func (p *sessionPool) close() { |
| if p == nil { |
| return |
| } |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return |
| } |
| p.valid = false |
| p.mu.Unlock() |
| p.hc.close() |
| // destroy all the sessions |
| p.hc.mu.Lock() |
| allSessions := make([]*session, len(p.hc.queue.sessions)) |
| copy(allSessions, p.hc.queue.sessions) |
| p.hc.mu.Unlock() |
| for _, s := range allSessions { |
| s.destroy(false) |
| } |
| } |
| |
| // errInvalidSessionPool is the error for using an invalid session pool. |
| var errInvalidSessionPool = spannerErrorf(codes.InvalidArgument, "invalid session pool") |
| |
| // errGetSessionTimeout returns error for context timeout during |
| // sessionPool.take(). |
| var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") |
| |
| // newSessionHandle creates a new session handle for the given session for this |
| // session pool. The session handle will also hold a copy of the current call |
| // stack if the session pool has been configured to track the call stacks of |
| // sessions being checked out of the pool. |
| func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { |
| sh = &sessionHandle{session: s, checkoutTime: time.Now()} |
| if p.TrackSessionHandles { |
| p.mu.Lock() |
| sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) |
| p.mu.Unlock() |
| sh.stack = debug.Stack() |
| } |
| return sh |
| } |
| |
| // errGetSessionTimeout returns error for context timeout during |
| // sessionPool.take(). |
| func (p *sessionPool) errGetSessionTimeout(ctx context.Context) error { |
| var code codes.Code |
| if ctx.Err() == context.DeadlineExceeded { |
| code = codes.DeadlineExceeded |
| } else { |
| code = codes.Canceled |
| } |
| if p.TrackSessionHandles { |
| return p.errGetSessionTimeoutWithTrackedSessionHandles(code) |
| } |
| return p.errGetBasicSessionTimeout(code) |
| } |
| |
| // errGetBasicSessionTimeout returns error for context timout during |
| // sessionPool.take() without any tracked sessionHandles. |
| func (p *sessionPool) errGetBasicSessionTimeout(code codes.Code) error { |
| return spannerErrorf(code, "timeout / context canceled during getting session.\n"+ |
| "Enable SessionPoolConfig.TrackSessionHandles if you suspect a session leak to get more information about the checked out sessions.") |
| } |
| |
| // errGetSessionTimeoutWithTrackedSessionHandles returns error for context |
| // timout during sessionPool.take() including a stacktrace of each checked out |
| // session handle. |
| func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.Code) error { |
| err := spannerErrorf(code, "timeout / context canceled during getting session.") |
| err.(*Error).additionalInformation = p.getTrackedSessionHandleStacksLocked() |
| return err |
| } |
| |
| // getTrackedSessionHandleStacksLocked returns a string containing the |
| // stacktrace of all currently checked out sessions of the pool. This method |
| // requires the caller to have locked p.mu. |
| func (p *sessionPool) getTrackedSessionHandleStacksLocked() string { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| stackTraces := "" |
| i := 1 |
| element := p.trackedSessionHandles.Front() |
| for element != nil { |
| sh := element.Value.(*sessionHandle) |
| sh.mu.Lock() |
| if sh.stack != nil { |
| stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack) |
| } |
| sh.mu.Unlock() |
| element = element.Next() |
| i++ |
| } |
| return stackTraces |
| } |
| |
| // shouldPrepareWriteLocked returns true if we should prepare more sessions for write. |
| func (p *sessionPool) shouldPrepareWriteLocked() bool { |
| return !p.disableBackgroundPrepareSessions && float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) |
| } |
| |
| func (p *sessionPool) createSession(ctx context.Context) (*session, error) { |
| trace.TracePrintf(ctx, nil, "Creating a new session") |
| doneCreate := func(done bool) { |
| p.mu.Lock() |
| if !done { |
| // Session creation failed, give budget back. |
| p.numOpened-- |
| p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| } |
| p.createReqs-- |
| // Notify other waiters blocking on session creation. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| p.mu.Unlock() |
| } |
| s, err := p.sc.createSession(ctx) |
| if err != nil { |
| doneCreate(false) |
| // Should return error directly because of the previous retries on |
| // CreateSession RPC. |
| // If the error is a timeout, there is a chance that the session was |
| // created on the server but is not known to the session pool. This |
| // session will then be garbage collected by the server after 1 hour. |
| return nil, err |
| } |
| s.pool = p |
| p.hc.register(s) |
| doneCreate(true) |
| return s, nil |
| } |
| |
| func (p *sessionPool) isHealthy(s *session) bool { |
| if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { |
| if err := s.ping(); isSessionNotFoundError(err) { |
| // The session is already bad, continue to fetch/create a new one. |
| s.destroy(false) |
| return false |
| } |
| p.hc.scheduledHC(s) |
| } |
| return true |
| } |
| |
| // take returns a cached session if there are available ones; if there isn't |
| // any, it tries to allocate a new one. Session returned by take should be used |
| // for read operations. |
| func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { |
| trace.TracePrintf(ctx, nil, "Acquiring a read-only session") |
| for { |
| var s *session |
| |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return nil, errInvalidSessionPool |
| } |
| if p.idleList.Len() > 0 { |
| // Idle sessions are available, get one from the top of the idle |
| // list. |
| s = p.idleList.Remove(p.idleList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Acquired read-only session") |
| p.decNumReadsLocked(ctx) |
| } else if p.idleWriteList.Len() > 0 { |
| s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Acquired read-write session") |
| p.decNumWritesLocked(ctx) |
| } |
| if s != nil { |
| s.setIdleList(nil) |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| // From here, session is no longer in idle list, so healthcheck |
| // workers won't destroy it. If healthcheck workers failed to |
| // schedule healthcheck for the session timely, do the check here. |
| // Because session check is still much cheaper than session |
| // creation, they should be reused as much as possible. |
| if !p.isHealthy(s) { |
| continue |
| } |
| p.incNumInUse(ctx) |
| return p.newSessionHandle(s), nil |
| } |
| |
| // No session available. Start the creation of a new batch of sessions |
| // if that is allowed, and then wait for a session to come available. |
| if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { |
| numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) |
| if err := p.growPoolLocked(numSessions, false); err != nil { |
| p.mu.Unlock() |
| return nil, err |
| } |
| } |
| |
| p.numReadWaiters++ |
| mayGetSession := p.mayGetSession |
| p.mu.Unlock() |
| trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") |
| select { |
| case <-ctx.Done(): |
| trace.TracePrintf(ctx, nil, "Context done waiting for session") |
| p.recordStat(ctx, GetSessionTimeoutsCount, 1) |
| p.mu.Lock() |
| p.numReadWaiters-- |
| p.mu.Unlock() |
| return nil, p.errGetSessionTimeout(ctx) |
| case <-mayGetSession: |
| p.mu.Lock() |
| p.numReadWaiters-- |
| if p.sessionCreationError != nil { |
| trace.TracePrintf(ctx, nil, "Error creating session: %v", p.sessionCreationError) |
| err := p.sessionCreationError |
| p.mu.Unlock() |
| return nil, err |
| } |
| p.mu.Unlock() |
| } |
| } |
| } |
| |
| // takeWriteSession returns a write prepared cached session if there are |
| // available ones; if there isn't any, it tries to allocate a new one. Session |
| // returned should be used for read write transactions. |
| func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { |
| trace.TracePrintf(ctx, nil, "Acquiring a read-write session") |
| for { |
| var ( |
| s *session |
| err error |
| ) |
| |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return nil, errInvalidSessionPool |
| } |
| if p.idleWriteList.Len() > 0 { |
| // Idle sessions are available, get one from the top of the idle |
| // list. |
| s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") |
| p.decNumWritesLocked(ctx) |
| } else if p.idleList.Len() > 0 { |
| s = p.idleList.Remove(p.idleList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") |
| p.decNumReadsLocked(ctx) |
| } |
| if s != nil { |
| s.setIdleList(nil) |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| // From here, session is no longer in idle list, so healthcheck |
| // workers won't destroy it. If healthcheck workers failed to |
| // schedule healthcheck for the session timely, do the check here. |
| // Because session check is still much cheaper than session |
| // creation, they should be reused as much as possible. |
| if !p.isHealthy(s) { |
| continue |
| } |
| } else { |
| // No session available. Start the creation of a new batch of sessions |
| // if that is allowed, and then wait for a session to come available. |
| if p.numReadWaiters+p.numWriteWaiters >= p.createReqs { |
| numSessions := minUint64(p.MaxOpened-p.numOpened, p.incStep) |
| if err := p.growPoolLocked(numSessions, false); err != nil { |
| p.mu.Unlock() |
| return nil, err |
| } |
| } |
| |
| p.numWriteWaiters++ |
| mayGetSession := p.mayGetSession |
| p.mu.Unlock() |
| trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") |
| select { |
| case <-ctx.Done(): |
| trace.TracePrintf(ctx, nil, "Context done waiting for session") |
| p.recordStat(ctx, GetSessionTimeoutsCount, 1) |
| p.mu.Lock() |
| p.numWriteWaiters-- |
| p.mu.Unlock() |
| return nil, p.errGetSessionTimeout(ctx) |
| case <-mayGetSession: |
| p.mu.Lock() |
| p.numWriteWaiters-- |
| if p.sessionCreationError != nil { |
| err := p.sessionCreationError |
| p.mu.Unlock() |
| return nil, err |
| } |
| p.mu.Unlock() |
| } |
| continue |
| } |
| if !s.isWritePrepared() { |
| p.incNumBeingPrepared(ctx) |
| defer p.decNumBeingPrepared(ctx) |
| if err = s.prepareForWrite(ctx); err != nil { |
| if isSessionNotFoundError(err) { |
| s.destroy(false) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Session not found for write") |
| return nil, ToSpannerError(err) |
| } |
| |
| s.recycle() |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Error preparing session for write") |
| return nil, ToSpannerError(err) |
| } |
| } |
| p.incNumInUse(ctx) |
| return p.newSessionHandle(s), nil |
| } |
| } |
| |
| // recycle puts session s back to the session pool's idle list, it returns true |
| // if the session pool successfully recycles session s. |
| func (p *sessionPool) recycle(s *session) bool { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if !s.isValid() || !p.valid { |
| // Reject the session if session is invalid or pool itself is invalid. |
| return false |
| } |
| ctx := context.Background() |
| // Put session at the top of the list to be handed out in LIFO order for load balancing |
| // across channels. |
| if s.isWritePrepared() { |
| s.setIdleList(p.idleWriteList.PushFront(s)) |
| p.incNumWritesLocked(ctx) |
| } else { |
| s.setIdleList(p.idleList.PushFront(s)) |
| p.incNumReadsLocked(ctx) |
| } |
| // Broadcast that a session has been returned to idle list. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| return true |
| } |
| |
| // remove atomically removes session s from the session pool and invalidates s. |
| // If isExpire == true, the removal is triggered by session expiration and in |
| // such cases, only idle sessions can be removed. |
| func (p *sessionPool) remove(s *session, isExpire bool) bool { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { |
| // Don't expire session if the session is not in idle list (in use), or |
| // if number of open sessions is going below p.MinOpened. |
| return false |
| } |
| ol := s.setIdleList(nil) |
| ctx := context.Background() |
| // If the session is in the idlelist, remove it. |
| if ol != nil { |
| // Remove from whichever list it is in. |
| p.idleList.Remove(ol) |
| p.idleWriteList.Remove(ol) |
| if s.isWritePrepared() { |
| p.decNumWritesLocked(ctx) |
| } else { |
| p.decNumReadsLocked(ctx) |
| } |
| } |
| if s.invalidate() { |
| // Decrease the number of opened sessions. |
| p.numOpened-- |
| p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| // Broadcast that a session has been destroyed. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| return true |
| } |
| return false |
| } |
| |
| func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { |
| return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) |
| } |
| |
| func (p *sessionPool) incNumInUse(ctx context.Context) { |
| p.mu.Lock() |
| p.incNumInUseLocked(ctx) |
| p.mu.Unlock() |
| } |
| |
| func (p *sessionPool) incNumInUseLocked(ctx context.Context) { |
| p.numInUse++ |
| p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) |
| p.recordStat(ctx, AcquiredSessionsCount, 1) |
| if p.numInUse > p.maxNumInUse { |
| p.maxNumInUse = p.numInUse |
| p.recordStat(ctx, MaxInUseSessionsCount, int64(p.maxNumInUse)) |
| } |
| } |
| |
| func (p *sessionPool) decNumInUse(ctx context.Context) { |
| p.mu.Lock() |
| p.decNumInUseLocked(ctx) |
| p.mu.Unlock() |
| } |
| |
| func (p *sessionPool) decNumInUseLocked(ctx context.Context) { |
| p.numInUse-- |
| p.recordStat(ctx, SessionsCount, int64(p.numInUse), tagNumInUseSessions) |
| p.recordStat(ctx, ReleasedSessionsCount, 1) |
| } |
| |
| func (p *sessionPool) incNumReadsLocked(ctx context.Context) { |
| p.numReads++ |
| p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) |
| } |
| |
| func (p *sessionPool) decNumReadsLocked(ctx context.Context) { |
| p.numReads-- |
| p.recordStat(ctx, SessionsCount, int64(p.numReads), tagNumReadSessions) |
| } |
| |
| func (p *sessionPool) incNumWritesLocked(ctx context.Context) { |
| p.numWrites++ |
| p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) |
| } |
| |
| func (p *sessionPool) decNumWritesLocked(ctx context.Context) { |
| p.numWrites-- |
| p.recordStat(ctx, SessionsCount, int64(p.numWrites), tagNumWriteSessions) |
| } |
| |
| func (p *sessionPool) incNumBeingPrepared(ctx context.Context) { |
| p.mu.Lock() |
| p.incNumBeingPreparedLocked(ctx) |
| p.mu.Unlock() |
| } |
| |
| func (p *sessionPool) incNumBeingPreparedLocked(ctx context.Context) { |
| p.prepareReqs++ |
| p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) |
| } |
| |
| func (p *sessionPool) decNumBeingPrepared(ctx context.Context) { |
| p.mu.Lock() |
| p.decNumBeingPreparedLocked(ctx) |
| p.mu.Unlock() |
| } |
| |
| func (p *sessionPool) decNumBeingPreparedLocked(ctx context.Context) { |
| p.prepareReqs-- |
| p.recordStat(ctx, SessionsCount, int64(p.prepareReqs), tagNumBeingPrepared) |
| } |
| |
| // hcHeap implements heap.Interface. It is used to create the priority queue for |
| // session healthchecks. |
| type hcHeap struct { |
| sessions []*session |
| } |
| |
| // Len implements heap.Interface.Len. |
| func (h hcHeap) Len() int { |
| return len(h.sessions) |
| } |
| |
| // Less implements heap.Interface.Less. |
| func (h hcHeap) Less(i, j int) bool { |
| return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) |
| } |
| |
| // Swap implements heap.Interface.Swap. |
| func (h hcHeap) Swap(i, j int) { |
| h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] |
| h.sessions[i].setHcIndex(i) |
| h.sessions[j].setHcIndex(j) |
| } |
| |
| // Push implements heap.Interface.Push. |
| func (h *hcHeap) Push(s interface{}) { |
| ns := s.(*session) |
| ns.setHcIndex(len(h.sessions)) |
| h.sessions = append(h.sessions, ns) |
| } |
| |
| // Pop implements heap.Interface.Pop. |
| func (h *hcHeap) Pop() interface{} { |
| old := h.sessions |
| n := len(old) |
| s := old[n-1] |
| h.sessions = old[:n-1] |
| s.setHcIndex(-1) |
| return s |
| } |
| |
| // maintenanceWindowSize specifies the number of health check cycles that |
| // defines a maintenance window. The maintenance window keeps track of a |
| // rolling set of numbers for the number of maximum checked out sessions during |
| // the maintenance window. This is used by the maintainer to determine the |
| // number of sessions to create or delete at the end of each health check |
| // cycle. |
| const maintenanceWindowSize = 10 |
| |
| // maintenanceWindow contains the statistics that are gathered during a health |
| // check maintenance window. |
| type maintenanceWindow struct { |
| mu sync.Mutex |
| // maxSessionsCheckedOut contains the maximum number of sessions that was |
| // checked out of the session pool during a health check cycle. This number |
| // indicates the number of sessions that was actually needed by the pool to |
| // serve the load during that cycle. The values are kept as a rolling set |
| // containing the values for the past 10 cycles (minutes). The maintainer |
| // uses these values to determine the number of sessions to keep at the end |
| // of each cycle. |
| maxSessionsCheckedOut [maintenanceWindowSize]uint64 |
| } |
| |
| // maxSessionsCheckedOutDuringWindow returns the maximum number of sessions |
| // that has been checked out during the last maintenance window of 10 cycles |
| // (minutes). |
| func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| var max uint64 |
| for _, cycleMax := range mw.maxSessionsCheckedOut { |
| max = maxUint64(max, cycleMax) |
| } |
| return max |
| } |
| |
| // updateMaxSessionsCheckedOutDuringWindow updates the maximum number of |
| // sessions that has been checked out of the pool during the current |
| // cycle of the maintenance window. A maintenance window consists of 10 |
| // maintenance cycles. Each cycle keeps track of the max number of sessions in |
| // use during that cycle. The rolling maintenance window of 10 cycles is used |
| // to determine the number of sessions to keep at the end of a cycle by |
| // calculating the max in use during the last 10 cycles. |
| func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) |
| } |
| |
| // startNewCycle starts a new health check cycle with the specified number of |
| // checked out sessions as its initial value. |
| func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) |
| mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut |
| } |
| |
| // newMaintenanceWindow creates a new maintenance window with all values for |
| // maxSessionsCheckedOut set to maxOpened. This ensures that a complete |
| // maintenance window must pass before the maintainer will start to delete any |
| // sessions. |
| func newMaintenanceWindow(maxOpened uint64) *maintenanceWindow { |
| mw := &maintenanceWindow{} |
| // Initialize the rolling window with max values to prevent the maintainer |
| // from deleting sessions before a complete window of 10 cycles has |
| // finished. |
| for i := 0; i < maintenanceWindowSize; i++ { |
| mw.maxSessionsCheckedOut[i] = maxOpened |
| } |
| return mw |
| } |
| |
| // healthChecker performs periodical healthchecks on registered sessions. |
| type healthChecker struct { |
| // mu protects concurrent access to healthChecker. |
| mu sync.Mutex |
| // queue is the priority queue for session healthchecks. Sessions with lower |
| // nextCheck rank higher in the queue. |
| queue hcHeap |
| // interval is the average interval between two healthchecks on a session. |
| interval time.Duration |
| // workers is the number of concurrent healthcheck workers. |
| workers int |
| // waitWorkers waits for all healthcheck workers to exit |
| waitWorkers sync.WaitGroup |
| // pool is the underlying session pool. |
| pool *sessionPool |
| // sampleInterval is the interval of sampling by the maintainer. |
| sampleInterval time.Duration |
| // ready is used to signal that maintainer can start running. |
| ready chan struct{} |
| // done is used to signal that health checker should be closed. |
| done chan struct{} |
| // once is used for closing channel done only once. |
| once sync.Once |
| maintainerCancel func() |
| } |
| |
| // newHealthChecker initializes new instance of healthChecker. |
| func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { |
| if workers <= 0 { |
| workers = 1 |
| } |
| hc := &healthChecker{ |
| interval: interval, |
| workers: workers, |
| pool: pool, |
| sampleInterval: sampleInterval, |
| ready: make(chan struct{}), |
| done: make(chan struct{}), |
| maintainerCancel: func() {}, |
| } |
| hc.waitWorkers.Add(1) |
| go hc.maintainer() |
| for i := 1; i <= hc.workers; i++ { |
| hc.waitWorkers.Add(1) |
| go hc.worker(i) |
| } |
| return hc |
| } |
| |
| // close closes the healthChecker and waits for all healthcheck workers to exit. |
| func (hc *healthChecker) close() { |
| hc.mu.Lock() |
| hc.maintainerCancel() |
| hc.mu.Unlock() |
| hc.once.Do(func() { close(hc.done) }) |
| hc.waitWorkers.Wait() |
| } |
| |
| // isClosing checks if a healthChecker is already closing. |
| func (hc *healthChecker) isClosing() bool { |
| select { |
| case <-hc.done: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // getInterval gets the healthcheck interval. |
| func (hc *healthChecker) getInterval() time.Duration { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| return hc.interval |
| } |
| |
| // scheduledHCLocked schedules next healthcheck on session s with the assumption |
| // that hc.mu is being held. |
| func (hc *healthChecker) scheduledHCLocked(s *session) { |
| var constPart, randPart float64 |
| if !s.firstHCDone { |
| // The first check will be scheduled in a large range to make requests |
| // more evenly distributed. The first healthcheck will be scheduled |
| // after [interval*0.2, interval*1.1) ns. |
| constPart = float64(hc.interval) * 0.2 |
| randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.9 |
| s.firstHCDone = true |
| } else { |
| // The next healthcheck will be scheduled after |
| // [interval*0.9, interval*1.1) ns. |
| constPart = float64(hc.interval) * 0.9 |
| randPart = hc.pool.rand.Float64() * float64(hc.interval) * 0.2 |
| } |
| // math.Ceil makes the value to be at least 1 ns. |
| nsFromNow := int64(math.Ceil(constPart + randPart)) |
| s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) |
| if hi := s.getHcIndex(); hi != -1 { |
| // Session is still being tracked by healthcheck workers. |
| heap.Fix(&hc.queue, hi) |
| } |
| } |
| |
| // scheduledHC schedules next healthcheck on session s. It is safe to be called |
| // concurrently. |
| func (hc *healthChecker) scheduledHC(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| hc.scheduledHCLocked(s) |
| } |
| |
| // register registers a session with healthChecker for periodical healthcheck. |
| func (hc *healthChecker) register(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| hc.scheduledHCLocked(s) |
| heap.Push(&hc.queue, s) |
| } |
| |
| // unregister unregisters a session from healthcheck queue. |
| func (hc *healthChecker) unregister(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| oi := s.setHcIndex(-1) |
| if oi >= 0 { |
| heap.Remove(&hc.queue, oi) |
| } |
| } |
| |
| // markDone marks that health check for session has been performed. |
| func (hc *healthChecker) markDone(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| s.checkingHealth = false |
| } |
| |
| // healthCheck checks the health of the session and pings it if needed. |
| func (hc *healthChecker) healthCheck(s *session) { |
| defer hc.markDone(s) |
| if !s.pool.isValid() { |
| // Session pool is closed, perform a garbage collection. |
| s.destroy(false) |
| return |
| } |
| if err := s.ping(); isSessionNotFoundError(err) { |
| // Ping failed, destroy the session. |
| s.destroy(false) |
| } |
| } |
| |
| // worker performs the healthcheck on sessions in healthChecker's priority |
| // queue. |
| func (hc *healthChecker) worker(i int) { |
| // Returns a session which we should ping to keep it alive. |
| getNextForPing := func() *session { |
| hc.pool.mu.Lock() |
| defer hc.pool.mu.Unlock() |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| if hc.queue.Len() <= 0 { |
| // Queue is empty. |
| return nil |
| } |
| s := hc.queue.sessions[0] |
| if s.getNextCheck().After(time.Now()) && hc.pool.valid { |
| // All sessions have been checked recently. |
| return nil |
| } |
| hc.scheduledHCLocked(s) |
| if !s.checkingHealth { |
| s.checkingHealth = true |
| return s |
| } |
| return nil |
| } |
| |
| // Returns a session which we should prepare for write. |
| getNextForTx := func() *session { |
| hc.pool.mu.Lock() |
| defer hc.pool.mu.Unlock() |
| if hc.pool.shouldPrepareWriteLocked() { |
| if hc.pool.idleList.Len() > 0 && hc.pool.valid { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| if hc.pool.idleList.Front().Value.(*session).checkingHealth { |
| return nil |
| } |
| session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) |
| ctx := context.Background() |
| hc.pool.decNumReadsLocked(ctx) |
| session.checkingHealth = true |
| hc.pool.incNumBeingPreparedLocked(ctx) |
| return session |
| } |
| } |
| return nil |
| } |
| |
| for { |
| if hc.isClosing() { |
| // Exit when the pool has been closed and all sessions have been |
| // destroyed or when health checker has been closed. |
| hc.waitWorkers.Done() |
| return |
| } |
| ws := getNextForTx() |
| if ws != nil { |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
| err := ws.prepareForWrite(ctx) |
| if err != nil { |
| // Skip handling prepare error, session can be prepared in next |
| // cycle. |
| // Don't log about permission errors, which may be expected |
| // (e.g. using read-only auth). |
| serr := ToSpannerError(err).(*Error) |
| if serr.Code != codes.PermissionDenied { |
| logf(hc.pool.sc.logger, "Failed to prepare session, error: %v", serr) |
| } |
| } |
| hc.pool.recycle(ws) |
| hc.pool.mu.Lock() |
| hc.pool.decNumBeingPreparedLocked(ctx) |
| hc.pool.mu.Unlock() |
| cancel() |
| hc.markDone(ws) |
| } |
| rs := getNextForPing() |
| if rs == nil { |
| if ws == nil { |
| // No work to be done so sleep to avoid burning CPU. |
| pause := int64(100 * time.Millisecond) |
| if pause > int64(hc.interval) { |
| pause = int64(hc.interval) |
| } |
| select { |
| case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): |
| case <-hc.done: |
| } |
| |
| } |
| continue |
| } |
| hc.healthCheck(rs) |
| } |
| } |
| |
| // maintainer maintains the number of sessions in the pool based on the session |
| // pool configuration and the current and historical number of sessions checked |
| // out of the pool. The maintainer will: |
| // 1. Ensure that the session pool contains at least MinOpened sessions. |
| // 2. If the current number of sessions in the pool exceeds the greatest number |
| // of checked out sessions (=sessions in use) during the last 10 minutes, |
| // and the delta is larger than MaxIdleSessions, the maintainer will reduce |
| // the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. |
| func (hc *healthChecker) maintainer() { |
| // Wait until the pool is ready. |
| <-hc.ready |
| |
| for iteration := uint64(0); ; iteration++ { |
| if hc.isClosing() { |
| hc.waitWorkers.Done() |
| return |
| } |
| |
| hc.pool.mu.Lock() |
| currSessionsOpened := hc.pool.numOpened |
| maxIdle := hc.pool.MaxIdle |
| minOpened := hc.pool.MinOpened |
| |
| // Reset the start time for recording the maximum number of sessions |
| // in the pool. |
| now := time.Now() |
| if now.After(hc.pool.lastResetTime.Add(10 * time.Minute)) { |
| hc.pool.maxNumInUse = hc.pool.numInUse |
| hc.pool.recordStat(context.Background(), MaxInUseSessionsCount, int64(hc.pool.maxNumInUse)) |
| hc.pool.lastResetTime = now |
| } |
| hc.pool.mu.Unlock() |
| // Get the maximum number of sessions in use during the current |
| // maintenance window. |
| maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() |
| hc.mu.Lock() |
| ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) |
| hc.maintainerCancel = cancel |
| hc.mu.Unlock() |
| |
| // Grow or shrink pool if needed. |
| // The number of sessions in the pool should be in the range |
| // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] |
| if currSessionsOpened < minOpened { |
| if err := hc.growPoolInBatch(ctx, minOpened); err != nil { |
| logf(hc.pool.sc.logger, "failed to grow pool: %v", err) |
| } |
| } else if maxIdle+maxSessionsInUseDuringWindow < currSessionsOpened { |
| hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) |
| } |
| |
| select { |
| case <-ctx.Done(): |
| case <-hc.done: |
| cancel() |
| } |
| // Cycle the maintenance window. This will remove the oldest cycle and |
| // add a new cycle at the beginning of the maintenance window with the |
| // currently checked out number of sessions as the max number of |
| // sessions in use in this cycle. This value will be increased during |
| // the next cycle if it increases. |
| hc.pool.mu.Lock() |
| currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() |
| hc.pool.mu.Unlock() |
| hc.pool.mw.startNewCycle(currSessionsInUse) |
| } |
| } |
| |
| func (hc *healthChecker) growPoolInBatch(ctx context.Context, growToNumSessions uint64) error { |
| hc.pool.mu.Lock() |
| defer hc.pool.mu.Unlock() |
| numSessions := growToNumSessions - hc.pool.numOpened |
| return hc.pool.growPoolLocked(numSessions, false) |
| } |
| |
| // shrinkPool scales down the session pool. The method will stop deleting |
| // sessions when shrinkToNumSessions number of sessions in the pool has |
| // been reached. The method will also stop deleting sessions if it detects that |
| // another process has started creating sessions for the pool again, for |
| // example through the take() method. |
| func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { |
| hc.pool.mu.Lock() |
| maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) |
| hc.pool.mu.Unlock() |
| var deleted int |
| var prevNumOpened uint64 = math.MaxUint64 |
| for { |
| if ctx.Err() != nil { |
| return |
| } |
| |
| p := hc.pool |
| p.mu.Lock() |
| // Check if the number of open sessions has increased. If it has, we |
| // should stop deleting sessions, as the load has increased and |
| // additional sessions are needed. |
| if p.numOpened >= prevNumOpened { |
| p.mu.Unlock() |
| break |
| } |
| prevNumOpened = p.numOpened |
| |
| // Check on both whether we have reached the number of open sessions as |
| // well as the number of sessions to delete, in case sessions have been |
| // deleted by other methods because they have expired or deemed |
| // invalid. |
| if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { |
| p.mu.Unlock() |
| break |
| } |
| |
| var s *session |
| if p.idleList.Len() > 0 { |
| s = p.idleList.Front().Value.(*session) |
| } else if p.idleWriteList.Len() > 0 { |
| s = p.idleWriteList.Front().Value.(*session) |
| } |
| p.mu.Unlock() |
| if s != nil { |
| deleted++ |
| // destroy session as expire. |
| s.destroy(true) |
| } else { |
| break |
| } |
| } |
| } |
| |
| // maxUint64 returns the maximum of two uint64. |
| func maxUint64(a, b uint64) uint64 { |
| if a > b { |
| return a |
| } |
| return b |
| } |
| |
| // minUint64 returns the minimum of two uint64. |
| func minUint64(a, b uint64) uint64 { |
| if a > b { |
| return b |
| } |
| return a |
| } |
| |
| // sessionResourceType is the type name of Spanner sessions. |
| const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session" |
| |
| // isSessionNotFoundError returns true if the given error is a |
| // `Session not found` error. |
| func isSessionNotFoundError(err error) bool { |
| if err == nil { |
| return false |
| } |
| if ErrCode(err) == codes.NotFound { |
| if rt, ok := extractResourceType(err); ok { |
| return rt == sessionResourceType |
| } |
| } |
| return false |
| } |