blob: b2f97a9509ed0cceec95b512704f181f23135665 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package rls implements the RLS LB policy.
package rls
import (
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
)
const (
// Name is the name of the RLS LB policy.
//
// It currently has an experimental suffix which would be removed once
// end-to-end testing of the policy is completed.
Name = internal.RLSLoadBalancingPolicyName
// Default frequency for data cache purging.
periodicCachePurgeFreq = time.Minute
)
var (
logger = grpclog.Component("rls")
errBalancerClosed = errors.New("rls LB policy is closed")
// Below defined vars for overriding in unit tests.
// Default exponential backoff strategy for data cache entries.
defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
// Ticker used for periodic data cache purging.
dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
// We want every cache entry to live in the cache for at least this
// duration. If we encounter a cache entry whose minimum expiration time is
// in the future, we abort the LRU pass, which may temporarily leave the
// cache being too large. This is necessary to ensure that in cases where
// the cache is too small, when we receive an RLS Response, we keep the
// resulting cache entry around long enough for the pending incoming
// requests to be re-processed through the new Picker. If we didn't do this,
// then we'd risk throwing away each RLS response as we receive it, in which
// case we would fail to actually route any of our incoming requests.
minEvictDuration = 5 * time.Second
// Following functions are no-ops in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
)
func init() {
balancer.Register(&rlsBB{})
}
type rlsBB struct{}
func (rlsBB) Name() string {
return Name
}
func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
lb := &rlsBalancer{
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
dataCachePurgeHook: dataCachePurgeHook,
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
lb.bg.Start()
go lb.run()
return lb
}
// rlsBalancer implements the RLS LB policy.
type rlsBalancer struct {
closed *grpcsync.Event // Fires when Close() is invoked. Guarded by stateMu.
done *grpcsync.Event // Fires when Close() is done.
cc balancer.ClientConn
bopts balancer.BuildOptions
purgeTicker *time.Ticker
dataCachePurgeHook func()
logger *internalgrpclog.PrefixLogger
// If both cacheMu and stateMu need to be acquired, the former must be
// acquired first to prevent a deadlock. This order restriction is due to the
// fact that in places where we need to acquire both the locks, we always
// start off reading the cache.
// cacheMu guards access to the data cache and pending requests map. We
// cannot use an RWMutex here since even an operation like
// dataCache.getEntry() modifies the underlying LRU, which is implemented as
// a doubly linked list.
cacheMu sync.Mutex
dataCache *dataCache // Cache of RLS data.
pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.
// stateMu guards access to all LB policy state.
stateMu sync.Mutex
lbCfg *lbConfig // Most recently received service config.
childPolicyBuilder balancer.Builder // Cached child policy builder.
resolverState resolver.State // Cached resolver state.
ctrlCh *controlChannel // Control channel to the RLS server.
bg *balancergroup.BalancerGroup
childPolicies map[string]*childPolicyWrapper
defaultPolicy *childPolicyWrapper
// A reference to the most recent picker sent to gRPC as part of a state
// update is cached in this field so that we can release the reference to the
// default child policy wrapper when a new picker is created. See
// sendNewPickerLocked() for details.
lastPicker *rlsPicker
// Set during UpdateClientConnState when pushing updates to child policies.
// Prevents state updates from child policies causing new pickers to be sent
// up the channel. Cleared after all child policies have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool
// Channel on which all updates are pushed. Processed in run().
updateCh *buffer.Unbounded
}
type resumePickerUpdates struct {
done chan struct{}
}
// childPolicyIDAndState wraps a child policy id and its state update.
type childPolicyIDAndState struct {
id string
state balancer.State
}
type controlChannelReady struct{}
// run is a long-running goroutine which handles all the updates that the
// balancer wishes to handle. The appropriate updateHandler will push the update
// on to a channel that this goroutine will select on, thereby the handling of
// the update will happen asynchronously.
func (b *rlsBalancer) run() {
// We exit out of the for loop below only after `Close()` has been invoked.
// Firing the done event here will ensure that Close() returns only after
// all goroutines are done.
defer func() { b.done.Fire() }()
// Wait for purgeDataCache() goroutine to exit before returning from here.
doneCh := make(chan struct{})
defer func() {
<-doneCh
}()
go b.purgeDataCache(doneCh)
for {
select {
case u := <-b.updateCh.Get():
b.updateCh.Load()
switch update := u.(type) {
case childPolicyIDAndState:
b.handleChildPolicyStateUpdate(update.id, update.state)
case controlChannelReady:
b.logger.Infof("Resetting backoff state after control channel getting back to READY")
b.cacheMu.Lock()
updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
}
resetBackoffHook()
case resumePickerUpdates:
b.stateMu.Lock()
b.logger.Infof("Resuming picker updates after config propagation to child policies")
b.inhibitPickerUpdates = false
b.sendNewPickerLocked()
close(update.done)
b.stateMu.Unlock()
default:
b.logger.Errorf("Unsupported update type %T", update)
}
case <-b.closed.Done():
return
}
}
}
// purgeDataCache is a long-running goroutine which periodically deletes expired
// entries. An expired entry is one for which both the expiryTime and
// backoffExpiryTime are in the past.
func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
defer close(doneCh)
for {
select {
case <-b.closed.Done():
return
case <-b.purgeTicker.C:
b.cacheMu.Lock()
updatePicker := b.dataCache.evictExpiredEntries()
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
}
b.dataCachePurgeHook()
}
}
}
func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
defer clientConnUpdateHook()
b.stateMu.Lock()
if b.closed.HasFired() {
b.stateMu.Unlock()
b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
return errBalancerClosed
}
newCfg := ccs.BalancerConfig.(*lbConfig)
if b.lbCfg.Equal(newCfg) {
b.stateMu.Unlock()
b.logger.Infof("New service config matches existing config")
return nil
}
b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
b.inhibitPickerUpdates = true
// When the RLS server name changes, the old control channel needs to be
// swapped out for a new one. All state associated with the throttling
// algorithm is stored on a per-control-channel basis; when we swap out
// channels, we also swap out the throttling state.
b.handleControlChannelUpdate(newCfg)
// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, &ccs)
// Resize the cache if the size in the config has changed.
resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes
// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg
// Enqueue an event which will notify us when the above update has been
// propagated to all child policies, and the child policies have all
// processed their updates, and we have sent a picker update.
done := make(chan struct{})
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done
if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
return nil
}
// handleControlChannelUpdate handles updates to service config fields which
// influence the control channel to the RLS server.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {
if newCfg.lookupService == b.lbCfg.lookupService && newCfg.lookupServiceTimeout == b.lbCfg.lookupServiceTimeout {
return
}
// Create a new control channel and close the existing one.
b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
backToReadyFn := func() {
b.updateCh.Put(controlChannelReady{})
}
ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
if err != nil {
// This is very uncommon and usually represents a non-transient error.
// There is not much we can do here other than wait for another update
// which might fix things.
b.logger.Errorf("Failed to create control channel to %q: %v", newCfg.lookupService, err)
return
}
if b.ctrlCh != nil {
b.ctrlCh.close()
}
b.ctrlCh = ctrlCh
}
// handleChildPolicyConfigUpdate handles updates to service config fields which
// influence child policy configuration.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) handleChildPolicyConfigUpdate(newCfg *lbConfig, ccs *balancer.ClientConnState) {
// Update child policy builder first since other steps are dependent on this.
if b.childPolicyBuilder == nil || b.childPolicyBuilder.Name() != newCfg.childPolicyName {
b.logger.Infof("Child policy changed to %q", newCfg.childPolicyName)
b.childPolicyBuilder = balancer.Get(newCfg.childPolicyName)
for _, cpw := range b.childPolicies {
// If the child policy has changed, we need to remove the old policy
// from the BalancerGroup and add a new one. The BalancerGroup takes
// care of closing the old one in this case.
b.bg.Remove(cpw.target)
b.bg.Add(cpw.target, b.childPolicyBuilder)
}
}
configSentToDefault := false
if b.lbCfg.defaultTarget != newCfg.defaultTarget {
// If the default target has changed, create a new childPolicyWrapper for
// the new target if required. If a new wrapper is created, add it to the
// childPolicies map and the BalancerGroup.
b.logger.Infof("Default target in LB config changing from %q to %q", b.lbCfg.defaultTarget, newCfg.defaultTarget)
cpw := b.childPolicies[newCfg.defaultTarget]
if cpw == nil {
cpw = newChildPolicyWrapper(newCfg.defaultTarget)
b.childPolicies[newCfg.defaultTarget] = cpw
b.bg.Add(newCfg.defaultTarget, b.childPolicyBuilder)
b.logger.Infof("Child policy %q added to BalancerGroup", newCfg.defaultTarget)
}
if err := b.buildAndPushChildPolicyConfigs(newCfg.defaultTarget, newCfg, ccs); err != nil {
cpw.lamify(err)
}
// If an old default exists, release its reference. If this was the last
// reference, remove the child policy from the BalancerGroup and remove the
// corresponding entry the childPolicies map.
if b.defaultPolicy != nil {
if b.defaultPolicy.releaseRef() {
delete(b.childPolicies, b.lbCfg.defaultTarget)
b.bg.Remove(b.defaultPolicy.target)
}
}
b.defaultPolicy = cpw
configSentToDefault = true
}
// No change in configuration affecting child policies. Return early.
if b.lbCfg.childPolicyName == newCfg.childPolicyName && b.lbCfg.childPolicyTargetField == newCfg.childPolicyTargetField && childPolicyConfigEqual(b.lbCfg.childPolicyConfig, newCfg.childPolicyConfig) {
return
}
// If fields affecting child policy configuration have changed, the changes
// are pushed to the childPolicyWrapper which handles them appropriately.
for _, cpw := range b.childPolicies {
if configSentToDefault && cpw.target == newCfg.defaultTarget {
// Default target has already been taken care of.
continue
}
if err := b.buildAndPushChildPolicyConfigs(cpw.target, newCfg, ccs); err != nil {
cpw.lamify(err)
}
}
}
// buildAndPushChildPolicyConfigs builds the final child policy configuration by
// adding the `targetField` to the base child policy configuration received in
// RLS LB policy configuration. The `targetField` is set to target and
// configuration is pushed to the child policy through the BalancerGroup.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) buildAndPushChildPolicyConfigs(target string, newCfg *lbConfig, ccs *balancer.ClientConnState) error {
jsonTarget, err := json.Marshal(target)
if err != nil {
return fmt.Errorf("failed to marshal child policy target %q: %v", target, err)
}
config := newCfg.childPolicyConfig
targetField := newCfg.childPolicyTargetField
config[targetField] = jsonTarget
jsonCfg, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal child policy config %+v: %v", config, err)
}
parser, _ := b.childPolicyBuilder.(balancer.ConfigParser)
parsedCfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return fmt.Errorf("childPolicy config parsing failed: %v", err)
}
state := balancer.ClientConnState{ResolverState: ccs.ResolverState, BalancerConfig: parsedCfg}
b.logger.Infof("Pushing new state to child policy %q: %+v", target, state)
if err := b.bg.UpdateClientConnState(target, state); err != nil {
b.logger.Warningf("UpdateClientConnState(%q, %+v) failed : %v", target, ccs, err)
}
return nil
}
func (b *rlsBalancer) ResolverError(err error) {
b.bg.ResolverError(err)
}
func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.bg.UpdateSubConnState(sc, state)
}
func (b *rlsBalancer) Close() {
b.stateMu.Lock()
b.closed.Fire()
b.purgeTicker.Stop()
if b.ctrlCh != nil {
b.ctrlCh.close()
}
b.bg.Close()
b.stateMu.Unlock()
b.cacheMu.Lock()
b.dataCache.stop()
b.cacheMu.Unlock()
<-b.done.Done()
}
func (b *rlsBalancer) ExitIdle() {
b.bg.ExitIdle()
}
// sendNewPickerLocked pushes a new picker on to the channel.
//
// Note that regardless of what connectivity state is reported, the policy will
// return its own picker, and not a picker that unconditionally queues
// (typically used for IDLE or CONNECTING) or a picker that unconditionally
// fails (typically used for TRANSIENT_FAILURE). This is required because,
// irrespective of the connectivity state, we need to able to perform RLS
// lookups for incoming RPCs and affect the status of queued RPCs based on the
// receipt of RLS responses.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) sendNewPickerLocked() {
aggregatedState := b.aggregatedConnectivityState()
// Acquire a separate reference for the picker. This is required to ensure
// that the wrapper held by the old picker is not closed when the default
// target changes in the config, and a new wrapper is created for the new
// default target. See handleChildPolicyConfigUpdate() for how config changes
// affecting the default target are handled.
if b.defaultPolicy != nil {
b.defaultPolicy.acquireRef()
}
picker := &rlsPicker{
kbm: b.lbCfg.kbMap,
origEndpoint: b.bopts.Target.Endpoint(),
lb: b,
defaultPolicy: b.defaultPolicy,
ctrlCh: b.ctrlCh,
maxAge: b.lbCfg.maxAge,
staleAge: b.lbCfg.staleAge,
bg: b.bg,
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
ConnectivityState: aggregatedState,
Picker: picker,
}
if !b.inhibitPickerUpdates {
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)
} else {
b.logger.Infof("Delaying picker update: %+v", state)
}
if b.lastPicker != nil {
if b.defaultPolicy != nil {
b.defaultPolicy.releaseRef()
}
}
b.lastPicker = picker
}
func (b *rlsBalancer) sendNewPicker() {
b.stateMu.Lock()
defer b.stateMu.Unlock()
if b.closed.HasFired() {
return
}
b.sendNewPickerLocked()
}
// The aggregated connectivity state reported is determined as follows:
// - If there is at least one child policy in state READY, the connectivity
// state is READY.
// - Otherwise, if there is at least one child policy in state CONNECTING, the
// connectivity state is CONNECTING.
// - Otherwise, if there is at least one child policy in state IDLE, the
// connectivity state is IDLE.
// - Otherwise, all child policies are in TRANSIENT_FAILURE, and the
// connectivity state is TRANSIENT_FAILURE.
//
// If the RLS policy has no child policies and no configured default target,
// then we will report connectivity state IDLE.
//
// Caller must hold lb.stateMu.
func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
if len(b.childPolicies) == 0 && b.lbCfg.defaultTarget == "" {
return connectivity.Idle
}
var readyN, connectingN, idleN int
for _, cpw := range b.childPolicies {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
switch {
case readyN > 0:
return connectivity.Ready
case connectingN > 0:
return connectivity.Connecting
case idleN > 0:
return connectivity.Idle
default:
return connectivity.TransientFailure
}
}
// UpdateState is a implementation of the balancergroup.BalancerStateAggregator
// interface. The actual state aggregation functionality is handled
// asynchronously. This method only pushes the state update on to channel read
// and dispatched by the run() goroutine.
func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
}
// handleChildPolicyStateUpdate provides the state aggregator functionality for
// the BalancerGroup.
//
// This method is invoked by the BalancerGroup whenever a child policy sends a
// state update. We cache the child policy's connectivity state and picker for
// two reasons:
// - to suppress connectivity state transitions from TRANSIENT_FAILURE to states
// other than READY
// - to delegate picks to child policies
func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.State) {
b.stateMu.Lock()
defer b.stateMu.Unlock()
cpw := b.childPolicies[id]
if cpw == nil {
// All child policies start with an entry in the map. If ID is not in
// map, it's either been removed, or never existed.
b.logger.Warningf("Received state update %+v for missing child policy %q", newState, id)
return
}
oldState := (*balancer.State)(atomic.LoadPointer(&cpw.state))
if oldState.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting {
// Ignore state transitions from TRANSIENT_FAILURE to CONNECTING, and thus
// fail pending RPCs instead of queuing them indefinitely when all
// subChannels are failing, even if the subChannels are bouncing back and
// forth between CONNECTING and TRANSIENT_FAILURE.
return
}
atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
b.logger.Infof("Child policy %q has new state %+v", id, newState)
b.sendNewPickerLocked()
}
// acquireChildPolicyReferences attempts to acquire references to
// childPolicyWrappers corresponding to the passed in targets. If there is no
// childPolicyWrapper corresponding to one of the targets, a new one is created
// and added to the BalancerGroup.
func (b *rlsBalancer) acquireChildPolicyReferences(targets []string) []*childPolicyWrapper {
b.stateMu.Lock()
var newChildPolicies []*childPolicyWrapper
for _, target := range targets {
// If the target exists in the LB policy's childPolicies map. a new
// reference is taken here and added to the new list.
if cpw := b.childPolicies[target]; cpw != nil {
cpw.acquireRef()
newChildPolicies = append(newChildPolicies, cpw)
continue
}
// If the target does not exist in the child policy map, then a new
// child policy wrapper is created and added to the new list.
cpw := newChildPolicyWrapper(target)
b.childPolicies[target] = cpw
b.bg.Add(target, b.childPolicyBuilder)
b.logger.Infof("Child policy %q added to BalancerGroup", target)
newChildPolicies = append(newChildPolicies, cpw)
if err := b.buildAndPushChildPolicyConfigs(target, b.lbCfg, &balancer.ClientConnState{
ResolverState: b.resolverState,
}); err != nil {
cpw.lamify(err)
}
}
b.stateMu.Unlock()
return newChildPolicies
}
// releaseChildPolicyReferences releases references to childPolicyWrappers
// corresponding to the passed in targets. If the release reference was the last
// one, the child policy is removed from the BalancerGroup.
func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
b.stateMu.Lock()
for _, target := range targets {
if cpw := b.childPolicies[target]; cpw.releaseRef() {
delete(b.childPolicies, cpw.target)
b.bg.Remove(cpw.target)
}
}
b.stateMu.Unlock()
}