blob: 4acc11d90e944c2f99b4f3a8a64b4b3fdf6a0efd [file] [log] [blame]
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/adaptive"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
)
var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
type adaptiveThrottler interface {
ShouldThrottle() bool
RegisterBackendResponse(throttled bool)
}
// controlChannel is a wrapper around the gRPC channel to the RLS server
// specified in the service config.
type controlChannel struct {
// rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
// policy receives this value in its service config.
rpcTimeout time.Duration
// backToReadyFunc is a callback to be invoked when the connectivity state
// changes from READY --> TRANSIENT_FAILURE --> READY.
backToReadyFunc func()
// throttler in an adaptive throttling implementation used to avoid
// hammering the RLS service while it is overloaded or down.
throttler adaptiveThrottler
cc *grpc.ClientConn
client rlsgrpc.RouteLookupServiceClient
logger *internalgrpclog.PrefixLogger
}
// newControlChannel creates a controlChannel to rlsServerName and uses
// serviceConfig, if non-empty, as the default service config for the underlying
// gRPC channel.
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
ctrlCh := &controlChannel{
rpcTimeout: rpcTimeout,
backToReadyFunc: backToReadyFunc,
throttler: newAdaptiveThrottler(),
}
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
if err != nil {
return nil, err
}
ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
if err != nil {
return nil, err
}
ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
go ctrlCh.monitorConnectivityState()
return ctrlCh, nil
}
// dialOpts constructs the dial options for the control plane channel.
func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
// The control plane channel will use the same authority as the parent
// channel for server authorization. This ensures that the identity of the
// RLS server and the identity of the backends is the same, so if the RLS
// config is injected by an attacker, it cannot cause leakage of private
// information contained in headers set by the application.
dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
if bOpts.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
}
// The control channel will use the channel credentials from the parent
// channel, including any call creds associated with the channel creds.
var credsOpt grpc.DialOption
switch {
case bOpts.DialCreds != nil:
credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
case bOpts.CredsBundle != nil:
// The "fallback" mode in google default credentials (which is the only
// type of credentials we expect to be used with RLS) uses TLS/ALTS
// creds for transport and uses the same call creds as that on the
// parent bundle.
bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
if err != nil {
return nil, err
}
credsOpt = grpc.WithCredentialsBundle(bundle)
default:
cc.logger.Warningf("no credentials available, using Insecure")
credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
}
dopts = append(dopts, credsOpt)
// If the RLS LB policy's configuration specified a service config for the
// control channel, use that and disable service config fetching via the name
// resolver for the control channel.
if serviceConfig != "" {
cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
}
return dopts, nil
}
func (cc *controlChannel) monitorConnectivityState() {
cc.logger.Infof("Starting connectivity state monitoring goroutine")
// Since we use two mechanisms to deal with RLS server being down:
// - adaptive throttling for the channel as a whole
// - exponential backoff on a per-request basis
// we need a way to avoid double-penalizing requests by counting failures
// toward both mechanisms when the RLS server is unreachable.
//
// To accomplish this, we monitor the state of the control plane channel. If
// the state has been TRANSIENT_FAILURE since the last time it was in state
// READY, and it then transitions into state READY, we push on a channel
// which is being read by the LB policy.
//
// The LB the policy will iterate through the cache to reset the backoff
// timeouts in all cache entries. Specifically, this means that it will
// reset the backoff state and cancel the pending backoff timer. Note that
// when cancelling the backoff timer, just like when the backoff timer fires
// normally, a new picker is returned to the channel, to force it to
// re-process any wait-for-ready RPCs that may still be queued if we failed
// them while we were in backoff. However, we should optimize this case by
// returning only one new picker, regardless of how many backoff timers are
// cancelled.
// Using the background context is fine here since we check for the ClientConn
// entering SHUTDOWN and return early in that case.
ctx := context.Background()
first := true
for {
// Wait for the control channel to become READY.
for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
if s == connectivity.Shutdown {
return
}
cc.cc.WaitForStateChange(ctx, s)
}
cc.logger.Infof("Connectivity state is READY")
if !first {
cc.logger.Infof("Control channel back to READY")
cc.backToReadyFunc()
}
first = false
// Wait for the control channel to move out of READY.
cc.cc.WaitForStateChange(ctx, connectivity.Ready)
if cc.cc.GetState() == connectivity.Shutdown {
return
}
cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
}
}
func (cc *controlChannel) close() {
cc.logger.Infof("Closing control channel")
cc.cc.Close()
}
type lookupCallback func(targets []string, headerData string, err error)
// lookup starts a RouteLookup RPC in a separate goroutine and returns the
// results (and error, if any) in the provided callback.
//
// The returned boolean indicates whether the request was throttled by the
// client-side adaptive throttling algorithm in which case the provided callback
// will not be invoked.
func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
if cc.throttler.ShouldThrottle() {
cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
return true
}
go func() {
req := &rlspb.RouteLookupRequest{
TargetType: "grpc",
KeyMap: reqKeys,
Reason: reason,
StaleHeaderData: staleHeaders,
}
cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
defer cancel()
resp, err := cc.client.RouteLookup(ctx, req)
cb(resp.GetTargets(), resp.GetHeaderData(), err)
}()
return false
}