| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "container/list" |
| "time" |
| |
| "google.golang.org/grpc/internal/backoff" |
| internalgrpclog "google.golang.org/grpc/internal/grpclog" |
| "google.golang.org/grpc/internal/grpcsync" |
| ) |
| |
| // cacheKey represents the key used to uniquely identify an entry in the data |
| // cache and in the pending requests map. |
| type cacheKey struct { |
| // path is the full path of the incoming RPC request. |
| path string |
| // keys is a stringified version of the RLS request key map built using the |
| // RLS keyBuilder. Since maps are not a type which is comparable in Go, it |
| // cannot be part of the key for another map (entries in the data cache and |
| // pending requests map are stored in maps). |
| keys string |
| } |
| |
| // cacheEntry wraps all the data to be stored in a data cache entry. |
| type cacheEntry struct { |
| // childPolicyWrappers contains the list of child policy wrappers |
| // corresponding to the targets returned by the RLS server for this entry. |
| childPolicyWrappers []*childPolicyWrapper |
| // headerData is received in the RLS response and is to be sent in the |
| // X-Google-RLS-Data header for matching RPCs. |
| headerData string |
| // expiryTime is the absolute time at which this cache entry entry stops |
| // being valid. When an RLS request succeeds, this is set to the current |
| // time plus the max_age field from the LB policy config. |
| expiryTime time.Time |
| // staleTime is the absolute time after which this cache entry will be |
| // proactively refreshed if an incoming RPC matches this entry. When an RLS |
| // request succeeds, this is set to the current time plus the stale_age from |
| // the LB policy config. |
| staleTime time.Time |
| // earliestEvictTime is the absolute time before which this entry should not |
| // be evicted from the cache. When a cache entry is created, this is set to |
| // the current time plus a default value of 5 seconds. This is required to |
| // make sure that a new entry added to the cache is not evicted before the |
| // RLS response arrives (usually when the cache is too small). |
| earliestEvictTime time.Time |
| |
| // status stores the RPC status of the previous RLS request for this |
| // entry. Picks for entries with a non-nil value for this field are failed |
| // with the error stored here. |
| status error |
| // backoffState contains all backoff related state. When an RLS request |
| // succeeds, backoffState is reset. This state moves between the data cache |
| // and the pending requests map. |
| backoffState *backoffState |
| // backoffTime is the absolute time at which the backoff period for this |
| // entry ends. When an RLS request fails, this is set to the current time |
| // plus the backoff value returned by the backoffState. The backoff timer is |
| // also setup with this value. No new RLS requests are sent out for this |
| // entry until the backoff period ends. |
| // |
| // Set to zero time instant upon a successful RLS response. |
| backoffTime time.Time |
| // backoffExpiryTime is the absolute time at which an entry which has gone |
| // through backoff stops being valid. When an RLS request fails, this is |
| // set to the current time plus twice the backoff time. The cache expiry |
| // timer will only delete entries for which both expiryTime and |
| // backoffExpiryTime are in the past. |
| // |
| // Set to zero time instant upon a successful RLS response. |
| backoffExpiryTime time.Time |
| |
| // size stores the size of this cache entry. Used to enforce the cache size |
| // specified in the LB policy configuration. |
| size int64 |
| } |
| |
| // backoffState wraps all backoff related state associated with a cache entry. |
| type backoffState struct { |
| // retries keeps track of the number of RLS failures, to be able to |
| // determine the amount of time to backoff before the next attempt. |
| retries int |
| // bs is the exponential backoff implementation which returns the amount of |
| // time to backoff, given the number of retries. |
| bs backoff.Strategy |
| // timer fires when the backoff period ends and incoming requests after this |
| // will trigger a new RLS request. |
| timer *time.Timer |
| } |
| |
| // lru is a cache implementation with a least recently used eviction policy. |
| // Internally it uses a doubly linked list, with the least recently used element |
| // at the front of the list and the most recently used element at the back of |
| // the list. The value stored in this cache will be of type `cacheKey`. |
| // |
| // It is not safe for concurrent access. |
| type lru struct { |
| ll *list.List |
| |
| // A map from the value stored in the lru to its underlying list element is |
| // maintained to have a clean API. Without this, a subset of the lru's API |
| // would accept/return cacheKey while another subset would accept/return |
| // list elements. |
| m map[cacheKey]*list.Element |
| } |
| |
| // newLRU creates a new cache with a least recently used eviction policy. |
| func newLRU() *lru { |
| return &lru{ |
| ll: list.New(), |
| m: make(map[cacheKey]*list.Element), |
| } |
| } |
| |
| func (l *lru) addEntry(key cacheKey) { |
| e := l.ll.PushBack(key) |
| l.m[key] = e |
| } |
| |
| func (l *lru) makeRecent(key cacheKey) { |
| e := l.m[key] |
| l.ll.MoveToBack(e) |
| } |
| |
| func (l *lru) removeEntry(key cacheKey) { |
| e := l.m[key] |
| l.ll.Remove(e) |
| delete(l.m, key) |
| } |
| |
| func (l *lru) getLeastRecentlyUsed() cacheKey { |
| e := l.ll.Front() |
| if e == nil { |
| return cacheKey{} |
| } |
| return e.Value.(cacheKey) |
| } |
| |
| // dataCache contains a cache of RLS data used by the LB policy to make routing |
| // decisions. |
| // |
| // The dataCache will be keyed by the request's path and keys, represented by |
| // the `cacheKey` type. It will maintain the cache keys in an `lru` and the |
| // cache data, represented by the `cacheEntry` type, in a native map. |
| // |
| // It is not safe for concurrent access. |
| type dataCache struct { |
| maxSize int64 // Maximum allowed size. |
| currentSize int64 // Current size. |
| keys *lru // Cache keys maintained in lru order. |
| entries map[cacheKey]*cacheEntry |
| logger *internalgrpclog.PrefixLogger |
| shutdown *grpcsync.Event |
| } |
| |
| func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache { |
| return &dataCache{ |
| maxSize: size, |
| keys: newLRU(), |
| entries: make(map[cacheKey]*cacheEntry), |
| logger: logger, |
| shutdown: grpcsync.NewEvent(), |
| } |
| } |
| |
| // resize changes the maximum allowed size of the data cache. |
| // |
| // The return value indicates if an entry with a valid backoff timer was |
| // evicted. This is important to the RLS LB policy which would send a new picker |
| // on the channel to re-process any RPCs queued as a result of this backoff |
| // timer. |
| func (dc *dataCache) resize(size int64) (backoffCancelled bool) { |
| if dc.shutdown.HasFired() { |
| return false |
| } |
| |
| backoffCancelled = false |
| for dc.currentSize > size { |
| key := dc.keys.getLeastRecentlyUsed() |
| entry, ok := dc.entries[key] |
| if !ok { |
| // This should never happen. |
| dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to resize it", key) |
| break |
| } |
| |
| // When we encounter a cache entry whose minimum expiration time is in |
| // the future, we abort the LRU pass, which may temporarily leave the |
| // cache being too large. This is necessary to ensure that in cases |
| // where the cache is too small, when we receive an RLS Response, we |
| // keep the resulting cache entry around long enough for the pending |
| // incoming requests to be re-processed through the new Picker. If we |
| // didn't do this, then we'd risk throwing away each RLS response as we |
| // receive it, in which case we would fail to actually route any of our |
| // incoming requests. |
| if entry.earliestEvictTime.After(time.Now()) { |
| dc.logger.Warningf("cachekey %+v is too recent to be evicted. Stopping cache resizing for now", key) |
| break |
| } |
| |
| // Stop the backoff timer before evicting the entry. |
| if entry.backoffState != nil && entry.backoffState.timer != nil { |
| if entry.backoffState.timer.Stop() { |
| entry.backoffState.timer = nil |
| backoffCancelled = true |
| } |
| } |
| dc.deleteAndcleanup(key, entry) |
| } |
| dc.maxSize = size |
| return backoffCancelled |
| } |
| |
| // evictExpiredEntries sweeps through the cache and deletes expired entries. An |
| // expired entry is one for which both the `expiryTime` and `backoffExpiryTime` |
| // fields are in the past. |
| // |
| // The return value indicates if any expired entries were evicted. |
| // |
| // The LB policy invokes this method periodically to purge expired entries. |
| func (dc *dataCache) evictExpiredEntries() bool { |
| if dc.shutdown.HasFired() { |
| return false |
| } |
| |
| evicted := false |
| for key, entry := range dc.entries { |
| // Only evict entries for which both the data expiration time and |
| // backoff expiration time fields are in the past. |
| now := time.Now() |
| if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) { |
| continue |
| } |
| dc.deleteAndcleanup(key, entry) |
| evicted = true |
| } |
| return evicted |
| } |
| |
| // resetBackoffState sweeps through the cache and for entries with a backoff |
| // state, the backoff timer is cancelled and the backoff state is reset. The |
| // return value indicates if any entries were mutated in this fashion. |
| // |
| // The LB policy invokes this method when the control channel moves from READY |
| // to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on |
| // the `controlChannel` type for more details. |
| func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool { |
| if dc.shutdown.HasFired() { |
| return false |
| } |
| |
| backoffReset := false |
| for _, entry := range dc.entries { |
| if entry.backoffState == nil { |
| continue |
| } |
| if entry.backoffState.timer != nil { |
| entry.backoffState.timer.Stop() |
| entry.backoffState.timer = nil |
| } |
| entry.backoffState = &backoffState{bs: newBackoffState.bs} |
| entry.backoffTime = time.Time{} |
| entry.backoffExpiryTime = time.Time{} |
| backoffReset = true |
| } |
| return backoffReset |
| } |
| |
| // addEntry adds a cache entry for the given key. |
| // |
| // Return value backoffCancelled indicates if a cache entry with a valid backoff |
| // timer was evicted to make space for the current entry. This is important to |
| // the RLS LB policy which would send a new picker on the channel to re-process |
| // any RPCs queued as a result of this backoff timer. |
| // |
| // Return value ok indicates if entry was successfully added to the cache. |
| func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled bool, ok bool) { |
| if dc.shutdown.HasFired() { |
| return false, false |
| } |
| |
| // Handle the extremely unlikely case that a single entry is bigger than the |
| // size of the cache. |
| if entry.size > dc.maxSize { |
| return false, false |
| } |
| dc.entries[key] = entry |
| dc.currentSize += entry.size |
| dc.keys.addEntry(key) |
| // If the new entry makes the cache go over its configured size, remove some |
| // old entries. |
| if dc.currentSize > dc.maxSize { |
| backoffCancelled = dc.resize(dc.maxSize) |
| } |
| return backoffCancelled, true |
| } |
| |
| // updateEntrySize updates the size of a cache entry and the current size of the |
| // data cache. An entry's size can change upon receipt of an RLS response. |
| func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) { |
| dc.currentSize -= entry.size |
| entry.size = newSize |
| dc.currentSize += entry.size |
| } |
| |
| func (dc *dataCache) getEntry(key cacheKey) *cacheEntry { |
| if dc.shutdown.HasFired() { |
| return nil |
| } |
| |
| entry, ok := dc.entries[key] |
| if !ok { |
| return nil |
| } |
| dc.keys.makeRecent(key) |
| return entry |
| } |
| |
| func (dc *dataCache) removeEntryForTesting(key cacheKey) { |
| entry, ok := dc.entries[key] |
| if !ok { |
| return |
| } |
| dc.deleteAndcleanup(key, entry) |
| } |
| |
| // deleteAndCleanup performs actions required at the time of deleting an entry |
| // from the data cache. |
| // - the entry is removed from the map of entries |
| // - current size of the data cache is update |
| // - the key is removed from the LRU |
| func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) { |
| delete(dc.entries, key) |
| dc.currentSize -= entry.size |
| dc.keys.removeEntry(key) |
| } |
| |
| func (dc *dataCache) stop() { |
| for key, entry := range dc.entries { |
| dc.deleteAndcleanup(key, entry) |
| } |
| dc.shutdown.Fire() |
| } |