blob: b0310660fb29135c959d2d573bcee8adc836886f [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weightedtarget implements the weighted_target balancer.
package weightedtarget
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
)
const weightedTargetName = "weighted_target_experimental"
// newRandomWRR is the WRR constructor used to pick sub-pickers from
// sub-balancers. It's to be modified in tests.
var newRandomWRR = wrr.NewRandom
func init() {
balancer.Register(&weightedTargetBB{})
}
type weightedTargetBB struct{}
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &weightedTargetBalancer{}
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, newRandomWRR)
b.stateAggregator.Start()
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
}
func (wt *weightedTargetBB) Name() string {
return weightedTargetName
}
func (wt *weightedTargetBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type weightedTargetBalancer struct {
logger *grpclog.PrefixLogger
// TODO: Make this package not dependent on any xds specific code.
// BalancerGroup uses xdsinternal.LocalityID as the key in the map of child
// policies that it maintains and reports load using LRS. Once these two
// dependencies are removed from the balancerGroup, this package will not
// have any dependencies on xds code.
bg *balancergroup.BalancerGroup
stateAggregator *weightedaggregator.Aggregator
targets map[string]target
}
// UpdateClientConnState takes the new targets in balancer group,
// creates/deletes sub-balancers and sends them update. Addresses are split into
// groups based on hierarchy path.
func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
var rebuildStateAndPicker bool
// Remove sub-pickers and sub-balancers that are not in the new config.
for name := range w.targets {
if _, ok := newConfig.Targets[name]; !ok {
w.stateAggregator.Remove(name)
w.bg.Remove(name)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
}
}
// For sub-balancers in the new config
// - if it's new. add to balancer group,
// - if it's old, but has a new weight, update weight in balancer group.
//
// For all sub-balancers, forward the address/balancer config update.
for name, newT := range newConfig.Targets {
oldT, ok := w.targets[name]
if !ok {
// If this is a new sub-balancer, add weights to the picker map.
w.stateAggregator.Add(name, newT.Weight)
// Then add to the balancer group.
w.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
w.stateAggregator.UpdateWeight(name, newT.Weight)
// Trigger a state/picker update, because we don't want `ClientConn`
// should do picks with the new weights now.
rebuildStateAndPicker = true
}
// Forwards all the update:
// - Addresses are from the map after splitting with hierarchy path,
// - Top level service config and attributes are the same,
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = w.bg.UpdateClientConnState(name, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
},
BalancerConfig: newT.ChildPolicy.Config,
})
}
w.targets = newConfig.Targets
if rebuildStateAndPicker {
w.stateAggregator.BuildAndUpdate()
}
return nil
}
func (w *weightedTargetBalancer) ResolverError(err error) {
w.bg.ResolverError(err)
}
func (w *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
w.bg.UpdateSubConnState(sc, state)
}
func (w *weightedTargetBalancer) Close() {
w.stateAggregator.Stop()
w.bg.Close()
}