| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "errors" |
| "fmt" |
| "strings" |
| "sync/atomic" |
| "time" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/keys" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/connectivity" |
| internalgrpclog "google.golang.org/grpc/internal/grpclog" |
| rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var ( |
| errRLSThrottled = errors.New("RLS call throttled at client side") |
| |
| // Function to compute data cache entry size. |
| computeDataCacheEntrySize = dcEntrySize |
| ) |
| |
| // exitIdler wraps the only method on the BalancerGroup that the picker calls. |
| type exitIdler interface { |
| ExitIdleOne(id string) |
| } |
| |
| // rlsPicker selects the subConn to be used for a particular RPC. It does not |
| // manage subConns directly and delegates to pickers provided by child policies. |
| type rlsPicker struct { |
| // The keyBuilder map used to generate RLS keys for the RPC. This is built |
| // by the LB policy based on the received ServiceConfig. |
| kbm keys.BuilderMap |
| // Endpoint from the user's original dial target. Used to set the `host_key` |
| // field in `extra_keys`. |
| origEndpoint string |
| |
| lb *rlsBalancer |
| |
| // The picker is given its own copy of the below fields from the RLS LB policy |
| // to avoid having to grab the mutex on the latter. |
| defaultPolicy *childPolicyWrapper // Child policy for the default target. |
| ctrlCh *controlChannel // Control channel to the RLS server. |
| maxAge time.Duration // Cache max age from LB config. |
| staleAge time.Duration // Cache stale age from LB config. |
| bg exitIdler |
| logger *internalgrpclog.PrefixLogger |
| } |
| |
| // isFullMethodNameValid return true if name is of the form `/service/method`. |
| func isFullMethodNameValid(name string) bool { |
| return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2 |
| } |
| |
| // Pick makes the routing decision for every outbound RPC. |
| func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { |
| if name := info.FullMethodName; !isFullMethodNameValid(name) { |
| return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name) |
| } |
| |
| // Build the request's keys using the key builders from LB config. |
| md, _ := metadata.FromOutgoingContext(info.Ctx) |
| reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName) |
| |
| p.lb.cacheMu.Lock() |
| defer p.lb.cacheMu.Unlock() |
| |
| // Lookup data cache and pending request map using request path and keys. |
| cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str} |
| dcEntry := p.lb.dataCache.getEntry(cacheKey) |
| pendingEntry := p.lb.pendingMap[cacheKey] |
| now := time.Now() |
| |
| switch { |
| // No data cache entry. No pending request. |
| case dcEntry == nil && pendingEntry == nil: |
| throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") |
| if throttled { |
| return p.useDefaultPickIfPossible(info, errRLSThrottled) |
| } |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| |
| // No data cache entry. Pending request exits. |
| case dcEntry == nil && pendingEntry != nil: |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| |
| // Data cache hit. No pending request. |
| case dcEntry != nil && pendingEntry == nil: |
| if dcEntry.expiryTime.After(now) { |
| if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) { |
| p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData) |
| } |
| // Delegate to child policies. |
| res, err := p.delegateToChildPoliciesLocked(dcEntry, info) |
| return res, err |
| } |
| |
| // We get here only if the data cache entry has expired. If entry is in |
| // backoff, delegate to default target or fail the pick. |
| if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) { |
| // Avoid propagating the status code received on control plane RPCs to the |
| // data plane which can lead to unexpected outcomes as we do not control |
| // the status code sent by the control plane. Propagating the status |
| // message received from the control plane is still fine, as it could be |
| // useful for debugging purposes. |
| st := dcEntry.status |
| return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error()))) |
| } |
| |
| // We get here only if the entry has expired and is not in backoff. |
| throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "") |
| if throttled { |
| return p.useDefaultPickIfPossible(info, errRLSThrottled) |
| } |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| |
| // Data cache hit. Pending request exists. |
| default: |
| if dcEntry.expiryTime.After(now) { |
| res, err := p.delegateToChildPoliciesLocked(dcEntry, info) |
| return res, err |
| } |
| // Data cache entry has expired and pending request exists. Queue pick. |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| } |
| } |
| |
| // delegateToChildPoliciesLocked is a helper function which iterates through the |
| // list of child policy wrappers in a cache entry and attempts to find a child |
| // policy to which this RPC can be routed to. If all child policies are in |
| // TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. |
| func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { |
| const rlsDataHeaderName = "x-google-rls-data" |
| for i, cpw := range dcEntry.childPolicyWrappers { |
| state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) |
| // Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if |
| // it is the last one (which handles the case of delegating to the last |
| // child picker if all child polcies are in TRANSIENT_FAILURE). |
| if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 { |
| // Any header data received from the RLS server is stored in the |
| // cache entry and needs to be sent to the actual backend in the |
| // X-Google-RLS-Data header. |
| res, err := state.Picker.Pick(info) |
| if err != nil { |
| return res, err |
| } |
| if res.Metatada == nil { |
| res.Metatada = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData) |
| } else { |
| res.Metatada.Append(rlsDataHeaderName, dcEntry.headerData) |
| } |
| return res, nil |
| } |
| } |
| // In the unlikely event that we have a cache entry with no targets, we end up |
| // queueing the RPC. |
| return balancer.PickResult{}, balancer.ErrNoSubConnAvailable |
| } |
| |
| // useDefaultPickIfPossible is a helper method which delegates to the default |
| // target if one is configured, or fails the pick with the given error. |
| func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) { |
| if p.defaultPolicy != nil { |
| state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state)) |
| return state.Picker.Pick(info) |
| } |
| return balancer.PickResult{}, errOnNoDefault |
| } |
| |
| // sendRouteLookupRequestLocked adds an entry to the pending request map and |
| // sends out an RLS request using the passed in arguments. Returns a value |
| // indicating if the request was throttled by the client-side adaptive |
| // throttler. |
| func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool { |
| if p.lb.pendingMap[cacheKey] != nil { |
| return false |
| } |
| |
| p.lb.pendingMap[cacheKey] = bs |
| throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) { |
| p.handleRouteLookupResponse(cacheKey, targets, headerData, err) |
| }) |
| if throttled { |
| delete(p.lb.pendingMap, cacheKey) |
| } |
| return throttled |
| } |
| |
| // handleRouteLookupResponse is the callback invoked by the control channel upon |
| // receipt of an RLS response. Modifies the data cache and pending requests map |
| // and sends a new picker. |
| // |
| // Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu. |
| func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) { |
| p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err) |
| |
| p.lb.cacheMu.Lock() |
| defer func() { |
| // Pending request map entry is unconditionally deleted since the request is |
| // no longer pending. |
| p.logger.Infof("Removing pending request entry for key %+v", cacheKey) |
| delete(p.lb.pendingMap, cacheKey) |
| p.lb.sendNewPicker() |
| p.lb.cacheMu.Unlock() |
| }() |
| |
| // Lookup the data cache entry or create a new one. |
| dcEntry := p.lb.dataCache.getEntry(cacheKey) |
| if dcEntry == nil { |
| dcEntry = &cacheEntry{} |
| if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok { |
| // This is a very unlikely case where we are unable to add a |
| // data cache entry. Log and leave. |
| p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey) |
| return |
| } |
| } |
| |
| // For failed requests, the data cache entry is modified as follows: |
| // - status is set to error returned from the control channel |
| // - current backoff state is available in the pending entry |
| // - `retries` field is incremented and |
| // - backoff state is moved to the data cache |
| // - backoffTime is set to the time indicated by the backoff state |
| // - backoffExpirationTime is set to twice the backoff time |
| // - backoffTimer is set to fire after backoffTime |
| // |
| // When a proactive cache refresh fails, this would leave the targets and the |
| // expiry time from the old entry unchanged. And this mean that the old valid |
| // entry would be used until expiration, and a new picker would be sent upon |
| // backoff expiry. |
| now := time.Now() |
| if err != nil { |
| dcEntry.status = err |
| pendingEntry := p.lb.pendingMap[cacheKey] |
| pendingEntry.retries++ |
| backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries) |
| dcEntry.backoffState = pendingEntry |
| dcEntry.backoffTime = now.Add(backoffTime) |
| dcEntry.backoffExpiryTime = now.Add(2 * backoffTime) |
| if dcEntry.backoffState.timer != nil { |
| dcEntry.backoffState.timer.Stop() |
| } |
| dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker) |
| return |
| } |
| |
| // For successful requests, the cache entry is modified as follows: |
| // - childPolicyWrappers is set to point to the child policy wrappers |
| // associated with the targets specified in the received response |
| // - headerData is set to the value received in the response |
| // - expiryTime, stateTime and earliestEvictionTime are set |
| // - status is set to nil (OK status) |
| // - backoff state is cleared |
| p.setChildPolicyWrappersInCacheEntry(dcEntry, targets) |
| dcEntry.headerData = headerData |
| dcEntry.expiryTime = now.Add(p.maxAge) |
| if p.staleAge != 0 { |
| dcEntry.staleTime = now.Add(p.staleAge) |
| } |
| dcEntry.earliestEvictTime = now.Add(minEvictDuration) |
| dcEntry.status = nil |
| dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy} |
| dcEntry.backoffTime = time.Time{} |
| dcEntry.backoffExpiryTime = time.Time{} |
| p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry)) |
| } |
| |
| // setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in |
| // the cache entry to point to the child policy wrappers for the targets |
| // specified in the RLS response. |
| // |
| // Caller must hold a write-lock on p.lb.cacheMu. |
| func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) { |
| // If the childPolicyWrappers field is already pointing to the right targets, |
| // then the field's value does not need to change. |
| targetsChanged := true |
| func() { |
| if cpws := dcEntry.childPolicyWrappers; cpws != nil { |
| if len(newTargets) != len(cpws) { |
| return |
| } |
| for i, target := range newTargets { |
| if cpws[i].target != target { |
| return |
| } |
| } |
| targetsChanged = false |
| } |
| }() |
| if !targetsChanged { |
| return |
| } |
| |
| // If the childPolicyWrappers field is not already set to the right targets, |
| // then it must be reset. We construct a new list of child policies and |
| // then swap out the old list for the new one. |
| newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets) |
| oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers)) |
| for i, cpw := range dcEntry.childPolicyWrappers { |
| oldChildPolicyTargets[i] = cpw.target |
| } |
| p.lb.releaseChildPolicyReferences(oldChildPolicyTargets) |
| dcEntry.childPolicyWrappers = newChildPolicies |
| } |
| |
| func dcEntrySize(key cacheKey, entry *cacheEntry) int64 { |
| return int64(len(key.path) + len(key.keys) + len(entry.headerData)) |
| } |