blob: 73844944655855ca364e5ba2020f3a75b0e15aca [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"errors"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/cache"
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/metadata"
)
var errRLSThrottled = errors.New("RLS call throttled at client side")
// RLS rlsPicker selects the subConn to be used for a particular RPC. It does
// not manage subConns directly and usually deletegates to pickers provided by
// child policies.
//
// The RLS LB policy creates a new rlsPicker object whenever its ServiceConfig
// is updated and provides a bunch of hooks for the rlsPicker to get the latest
// state that it can used to make its decision.
type rlsPicker struct {
// The keyBuilder map used to generate RLS keys for the RPC. This is built
// by the LB policy based on the received ServiceConfig.
kbm keys.BuilderMap
// The following hooks are setup by the LB policy to enable the rlsPicker to
// access state stored in the policy. This approach has the following
// advantages:
// 1. The rlsPicker is loosely coupled with the LB policy in the sense that
// updates happening on the LB policy like the receipt of an RLS
// response, or an update to the default rlsPicker etc are not explicitly
// pushed to the rlsPicker, but are readily available to the rlsPicker
// when it invokes these hooks. And the LB policy takes care of
// synchronizing access to these shared state.
// 2. It makes unit testing the rlsPicker easy since any number of these
// hooks could be overridden.
// readCache is used to read from the data cache and the pending request
// map in an atomic fashion. The first return parameter is the entry in the
// data cache, and the second indicates whether an entry for the same key
// is present in the pending cache.
readCache func(cache.Key) (*cache.Entry, bool)
// shouldThrottle decides if the current RPC should be throttled at the
// client side. It uses an adaptive throttling algorithm.
shouldThrottle func() bool
// startRLS kicks off an RLS request in the background for the provided RPC
// path and keyMap. An entry in the pending request map is created before
// sending out the request and an entry in the data cache is created or
// updated upon receipt of a response. See implementation in the LB policy
// for details.
startRLS func(string, keys.KeyMap)
// defaultPick enables the rlsPicker to delegate the pick decision to the
// rlsPicker returned by the child LB policy pointing to the default target
// specified in the service config.
defaultPick func(balancer.PickInfo) (balancer.PickResult, error)
}
// Pick makes the routing decision for every outbound RPC.
func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// For every incoming request, we first build the RLS keys using the
// keyBuilder we received from the LB policy. If no metadata is present in
// the context, we end up using an empty key.
km := keys.KeyMap{}
md, ok := metadata.FromOutgoingContext(info.Ctx)
if ok {
km = p.kbm.RLSKey(md, info.FullMethodName)
}
// We use the LB policy hook to read the data cache and the pending request
// map (whether or not an entry exists) for the RPC path and the generated
// RLS keys. We will end up kicking off an RLS request only if there is no
// pending request for the current RPC path and keys, and either we didn't
// find an entry in the data cache or the entry was stale and it wasn't in
// backoff.
startRequest := false
now := time.Now()
entry, pending := p.readCache(cache.Key{Path: info.FullMethodName, KeyMap: km.Str})
if entry == nil {
startRequest = true
} else {
entry.Mu.Lock()
defer entry.Mu.Unlock()
if entry.StaleTime.Before(now) && entry.BackoffTime.Before(now) {
// This is the proactive cache refresh.
startRequest = true
}
}
if startRequest && !pending {
if p.shouldThrottle() {
// The entry doesn't exist or has expired and the new RLS request
// has been throttled. Treat it as an error and delegate to default
// pick, if one exists, or fail the pick.
if entry == nil || entry.ExpiryTime.Before(now) {
if p.defaultPick != nil {
return p.defaultPick(info)
}
return balancer.PickResult{}, errRLSThrottled
}
// The proactive refresh has been throttled. Nothing to worry, just
// keep using the existing entry.
} else {
p.startRLS(info.FullMethodName, km)
}
}
if entry != nil {
if entry.ExpiryTime.After(now) {
// This is the jolly good case where we have found a valid entry in
// the data cache. We delegate to the LB policy associated with
// this cache entry.
return entry.ChildPicker.Pick(info)
} else if entry.BackoffTime.After(now) {
// The entry has expired, but is in backoff. We delegate to the
// default pick, if one exists, or return the error from the last
// failed RLS request for this entry.
if p.defaultPick != nil {
return p.defaultPick(info)
}
return balancer.PickResult{}, entry.CallStatus
}
}
// We get here only in the following cases:
// * No data cache entry or expired entry, RLS request sent out
// * No valid data cache entry and Pending cache entry exists
// We need to queue to pick which will be handled once the RLS response is
// received.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}