blob: 3305f4529fd97e7333ff3e8af4681ac1e169f43e [file] [log] [blame]
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
var (
errRLSThrottled = errors.New("RLS call throttled at client side")
// Function to compute data cache entry size.
computeDataCacheEntrySize = dcEntrySize
)
// exitIdler wraps the only method on the BalancerGroup that the picker calls.
type exitIdler interface {
ExitIdleOne(id string)
}
// rlsPicker selects the subConn to be used for a particular RPC. It does not
// manage subConns directly and delegates to pickers provided by child policies.
type rlsPicker struct {
// The keyBuilder map used to generate RLS keys for the RPC. This is built
// by the LB policy based on the received ServiceConfig.
kbm keys.BuilderMap
// Endpoint from the user's original dial target. Used to set the `host_key`
// field in `extra_keys`.
origEndpoint string
lb *rlsBalancer
// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}
// isFullMethodNameValid return true if name is of the form `/service/method`.
func isFullMethodNameValid(name string) bool {
return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
}
// Pick makes the routing decision for every outbound RPC.
func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if name := info.FullMethodName; !isFullMethodNameValid(name) {
return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
}
// Build the request's keys using the key builders from LB config.
md, _ := metadata.FromOutgoingContext(info.Ctx)
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
dcEntry := p.lb.dataCache.getEntry(cacheKey)
pendingEntry := p.lb.pendingMap[cacheKey]
now := time.Now()
switch {
// No data cache entry. No pending request.
case dcEntry == nil && pendingEntry == nil:
throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// No data cache entry. Pending request exits.
case dcEntry == nil && pendingEntry != nil:
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// Data cache hit. No pending request.
case dcEntry != nil && pendingEntry == nil:
if dcEntry.expiryTime.After(now) {
if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
}
// Delegate to child policies.
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}
// We get here only if the data cache entry has expired. If entry is in
// backoff, delegate to default target or fail the pick.
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
// Avoid propagating the status code received on control plane RPCs to the
// data plane which can lead to unexpected outcomes as we do not control
// the status code sent by the control plane. Propagating the status
// message received from the control plane is still fine, as it could be
// useful for debugging purposes.
st := dcEntry.status
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
}
// We get here only if the entry has expired and is not in backoff.
throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
return res, err
}
// Data cache entry has expired and pending request exists. Queue pick.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}
// delegateToChildPoliciesLocked is a helper function which iterates through the
// list of child policy wrappers in a cache entry and attempts to find a child
// policy to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
const rlsDataHeaderName = "x-google-rls-data"
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
// it is the last one (which handles the case of delegating to the last
// child picker if all child polcies are in TRANSIENT_FAILURE).
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
// Any header data received from the RLS server is stored in the
// cache entry and needs to be sent to the actual backend in the
// X-Google-RLS-Data header.
res, err := state.Picker.Pick(info)
if err != nil {
return res, err
}
if res.Metatada == nil {
res.Metatada = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
} else {
res.Metatada.Append(rlsDataHeaderName, dcEntry.headerData)
}
return res, nil
}
}
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
}
return balancer.PickResult{}, errOnNoDefault
}
// sendRouteLookupRequestLocked adds an entry to the pending request map and
// sends out an RLS request using the passed in arguments. Returns a value
// indicating if the request was throttled by the client-side adaptive
// throttler.
func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
if p.lb.pendingMap[cacheKey] != nil {
return false
}
p.lb.pendingMap[cacheKey] = bs
throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
})
if throttled {
delete(p.lb.pendingMap, cacheKey)
}
return throttled
}
// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
//
// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
p.lb.cacheMu.Lock()
defer func() {
// Pending request map entry is unconditionally deleted since the request is
// no longer pending.
p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
delete(p.lb.pendingMap, cacheKey)
p.lb.sendNewPicker()
p.lb.cacheMu.Unlock()
}()
// Lookup the data cache entry or create a new one.
dcEntry := p.lb.dataCache.getEntry(cacheKey)
if dcEntry == nil {
dcEntry = &cacheEntry{}
if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
// This is a very unlikely case where we are unable to add a
// data cache entry. Log and leave.
p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
return
}
}
// For failed requests, the data cache entry is modified as follows:
// - status is set to error returned from the control channel
// - current backoff state is available in the pending entry
// - `retries` field is incremented and
// - backoff state is moved to the data cache
// - backoffTime is set to the time indicated by the backoff state
// - backoffExpirationTime is set to twice the backoff time
// - backoffTimer is set to fire after backoffTime
//
// When a proactive cache refresh fails, this would leave the targets and the
// expiry time from the old entry unchanged. And this mean that the old valid
// entry would be used until expiration, and a new picker would be sent upon
// backoff expiry.
now := time.Now()
if err != nil {
dcEntry.status = err
pendingEntry := p.lb.pendingMap[cacheKey]
pendingEntry.retries++
backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
dcEntry.backoffState = pendingEntry
dcEntry.backoffTime = now.Add(backoffTime)
dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
if dcEntry.backoffState.timer != nil {
dcEntry.backoffState.timer.Stop()
}
dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
return
}
// For successful requests, the cache entry is modified as follows:
// - childPolicyWrappers is set to point to the child policy wrappers
// associated with the targets specified in the received response
// - headerData is set to the value received in the response
// - expiryTime, stateTime and earliestEvictionTime are set
// - status is set to nil (OK status)
// - backoff state is cleared
p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
dcEntry.headerData = headerData
dcEntry.expiryTime = now.Add(p.maxAge)
if p.staleAge != 0 {
dcEntry.staleTime = now.Add(p.staleAge)
}
dcEntry.earliestEvictTime = now.Add(minEvictDuration)
dcEntry.status = nil
dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
dcEntry.backoffTime = time.Time{}
dcEntry.backoffExpiryTime = time.Time{}
p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
}
// setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
// the cache entry to point to the child policy wrappers for the targets
// specified in the RLS response.
//
// Caller must hold a write-lock on p.lb.cacheMu.
func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
// If the childPolicyWrappers field is already pointing to the right targets,
// then the field's value does not need to change.
targetsChanged := true
func() {
if cpws := dcEntry.childPolicyWrappers; cpws != nil {
if len(newTargets) != len(cpws) {
return
}
for i, target := range newTargets {
if cpws[i].target != target {
return
}
}
targetsChanged = false
}
}()
if !targetsChanged {
return
}
// If the childPolicyWrappers field is not already set to the right targets,
// then it must be reset. We construct a new list of child policies and
// then swap out the old list for the new one.
newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
for i, cpw := range dcEntry.childPolicyWrappers {
oldChildPolicyTargets[i] = cpw.target
}
p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
dcEntry.childPolicyWrappers = newChildPolicies
}
func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
return int64(len(key.path) + len(key.keys) + len(entry.headerData))
}