blob: 7af97b76faf132ac14fb10b32ae2d0d43d01f52f [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcsync"
)
var (
_ balancer.Balancer = (*rlsBalancer)(nil)
// For overriding in tests.
newRLSClientFunc = newRLSClient
logger = grpclog.Component("rls")
)
// rlsBalancer implements the RLS LB policy.
type rlsBalancer struct {
done *grpcsync.Event
cc balancer.ClientConn
opts balancer.BuildOptions
// Mutex protects all the state maintained by the LB policy.
// TODO(easwars): Once we add the cache, we will also have another lock for
// the cache alone.
mu sync.Mutex
lbCfg *lbConfig // Most recently received service config.
rlsCC *grpc.ClientConn // ClientConn to the RLS server.
rlsC *rlsClient // RLS client wrapper.
ccUpdateCh chan *balancer.ClientConnState
}
// run is a long running goroutine which handles all the updates that the
// balancer wishes to handle. The appropriate updateHandler will push the update
// on to a channel that this goroutine will select on, thereby the handling of
// the update will happen asynchronously.
func (lb *rlsBalancer) run() {
for {
// TODO(easwars): Handle other updates like subConn state changes, RLS
// responses from the server etc.
select {
case u := <-lb.ccUpdateCh:
lb.handleClientConnUpdate(u)
case <-lb.done.Done():
return
}
}
}
// handleClientConnUpdate handles updates to the service config.
// If the RLS server name or the RLS RPC timeout changes, it updates the control
// channel accordingly.
// TODO(easwars): Handle updates to other fields in the service config.
func (lb *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
logger.Infof("rls: service config: %+v", ccs.BalancerConfig)
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.done.HasFired() {
logger.Warning("rls: received service config after balancer close")
return
}
newCfg := ccs.BalancerConfig.(*lbConfig)
if lb.lbCfg.Equal(newCfg) {
logger.Info("rls: new service config matches existing config")
return
}
lb.updateControlChannel(newCfg)
lb.lbCfg = newCfg
}
// UpdateClientConnState pushes the received ClientConnState update on the
// update channel which will be processed asynchronously by the run goroutine.
// Implements balancer.Balancer interface.
func (lb *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
select {
case lb.ccUpdateCh <- &ccs:
case <-lb.done.Done():
}
return nil
}
// ResolverErr implements balancer.Balancer interface.
func (lb *rlsBalancer) ResolverError(error) {
// ResolverError is called by gRPC when the name resolver reports an error.
// TODO(easwars): How do we handle this?
logger.Fatal("rls: ResolverError is not yet unimplemented")
}
// UpdateSubConnState implements balancer.Balancer interface.
func (lb *rlsBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {
logger.Fatal("rls: UpdateSubConnState is not yet implemented")
}
// Cleans up the resources allocated by the LB policy including the clientConn
// to the RLS server.
// Implements balancer.Balancer.
func (lb *rlsBalancer) Close() {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.done.Fire()
if lb.rlsCC != nil {
lb.rlsCC.Close()
}
}
// updateControlChannel updates the RLS client if required.
// Caller must hold lb.mu.
func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
oldCfg := lb.lbCfg
if newCfg.lookupService == oldCfg.lookupService && newCfg.lookupServiceTimeout == oldCfg.lookupServiceTimeout {
return
}
// Use RPC timeout from new config, if different from existing one.
timeout := oldCfg.lookupServiceTimeout
if timeout != newCfg.lookupServiceTimeout {
timeout = newCfg.lookupServiceTimeout
}
if newCfg.lookupService == oldCfg.lookupService {
// This is the case where only the timeout has changed. We will continue
// to use the existing clientConn. but will create a new rlsClient with
// the new timeout.
lb.rlsC = newRLSClientFunc(lb.rlsCC, lb.opts.Target.Endpoint, timeout)
return
}
// This is the case where the RLS server name has changed. We need to create
// a new clientConn and close the old one.
var dopts []grpc.DialOption
if dialer := lb.opts.Dialer; dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(dialer))
}
dopts = append(dopts, dialCreds(lb.opts))
cc, err := grpc.Dial(newCfg.lookupService, dopts...)
if err != nil {
logger.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err)
// An error from a non-blocking dial indicates something serious. We
// should continue to use the old control channel if one exists, and
// return so that the rest of the config updates can be processes.
return
}
if lb.rlsCC != nil {
lb.rlsCC.Close()
}
lb.rlsCC = cc
lb.rlsC = newRLSClientFunc(cc, lb.opts.Target.Endpoint, timeout)
}
func dialCreds(opts balancer.BuildOptions) grpc.DialOption {
// The control channel should use the same authority as that of the parent
// channel. This ensures that the identify of the RLS server and that of the
// backend is the same, so if the RLS config is injected by an attacker, it
// cannot cause leakage of private information contained in headers set by
// the application.
server := opts.Target.Authority
switch {
case opts.DialCreds != nil:
if err := opts.DialCreds.OverrideServerName(server); err != nil {
logger.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err)
return grpc.WithInsecure()
}
return grpc.WithTransportCredentials(opts.DialCreds)
case opts.CredsBundle != nil:
return grpc.WithTransportCredentials(opts.CredsBundle.TransportCredentials())
default:
logger.Warning("rls: no credentials available, using Insecure")
return grpc.WithInsecure()
}
}