blob: 37fc41c16885ee5bd96b2a0837c97722e7a0fa0b [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weightedaggregator implements state aggregator for weighted_target
// balancer.
//
// This is a separate package so it can be shared by weighted_target and eds.
// The eds balancer will be refactored to use weighted_target directly. After
// that, all functions and structs in this package can be moved to package
// weightedtarget and unexported.
package weightedaggregator
import (
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/wrr"
)
type weightedPickerState struct {
weight uint32
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}
func (s *weightedPickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}
// Aggregator is the weighted balancer state aggregator.
type Aggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
newWRR func() wrr.WRR
csEvltr *balancer.ConnectivityStateEvaluator
mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[string]*weightedPickerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
// Set when UpdateState call propagation is paused and an UpdateState call
// is suppressed.
needUpdateStateOnResume bool
}
// New creates a new weighted balancer state aggregator.
func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator {
return &Aggregator{
cc: cc,
logger: logger,
newWRR: newWRR,
csEvltr: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*weightedPickerState),
}
}
// Start starts the aggregator. It can be called after Stop to restart the
// aggretator.
func (wbsa *Aggregator) Start() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = true
}
// Stop stops the aggregator. When the aggregator is stopped, it won't call
// parent ClientConn to update balancer state.
func (wbsa *Aggregator) Stop() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = false
wbsa.clearStates()
}
// Add adds a sub-balancer state with weight. It adds a place holder, and waits for
// the real sub-balancer to update state.
func (wbsa *Aggregator) Add(id string, weight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.idToPickerState[id] = &weightedPickerState{
weight: weight,
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
wbsa.buildAndUpdateLocked()
}
// Remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
func (wbsa *Aggregator) Remove(id string) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
if _, ok := wbsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get csEvltr
// to remove the previous state for any aggregated state evaluations.
// transitions to and from connectivity.Shutdown are ignored by csEvltr.
wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(wbsa.idToPickerState, id)
wbsa.buildAndUpdateLocked()
}
// UpdateWeight updates the weight for the given id. Note that this doesn't
// trigger an update to the parent ClientConn. The caller should decide when
// it's necessary, and call BuildAndUpdate.
func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
pState, ok := wbsa.idToPickerState[id]
if !ok {
return
}
pState.weight = newWeight
}
// PauseStateUpdates causes UpdateState calls to not propagate to the parent
// ClientConn. The last state will be remembered and propagated when
// ResumeStateUpdates is called.
func (wbsa *Aggregator) PauseStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = true
wbsa.needUpdateStateOnResume = false
}
// ResumeStateUpdates will resume propagating UpdateState calls to the parent,
// and call UpdateState on the parent if any UpdateState call was suppressed.
func (wbsa *Aggregator) ResumeStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = false
if wbsa.needUpdateStateOnResume {
wbsa.cc.UpdateState(wbsa.build())
}
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
state, ok := wbsa.idToPickerState[id]
if !ok {
// All state starts with an entry in pickStateMap. If ID is not in map,
// it's either removed, or never existed.
return
}
if !(state.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
wbsa.csEvltr.RecordTransition(state.stateToAggregate, newState.ConnectivityState)
state.stateToAggregate = newState.ConnectivityState
}
state.state = newState
wbsa.buildAndUpdateLocked()
}
// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) clearStates() {
for _, pState := range wbsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
}
// buildAndUpdateLocked aggregates the connectivity states of the sub-balancers,
// builds a new picker and sends an update to the parent ClientConn.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) buildAndUpdateLocked() {
if !wbsa.started {
return
}
if wbsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
wbsa.needUpdateStateOnResume = true
return
}
wbsa.cc.UpdateState(wbsa.build())
}
// build combines sub-states into one.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
// Make sure picker's return error is consistent with the aggregatedState.
pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
switch aggState := wbsa.csEvltr.CurrentState(); aggState {
case connectivity.Connecting:
return balancer.State{
ConnectivityState: aggState,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}
case connectivity.TransientFailure:
// this means that all sub-balancers are now in TransientFailure.
for _, ps := range wbsa.idToPickerState {
pickers = append(pickers, *ps)
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
default:
for _, ps := range wbsa.idToPickerState {
if ps.stateToAggregate == connectivity.Ready {
pickers = append(pickers, *ps)
}
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
}
}
type weightedPickerGroup struct {
w wrr.WRR
}
// newWeightedPickerGroup takes pickers with weights, and groups them into one
// picker.
//
// Note it only takes ready pickers. The map shouldn't contain non-ready
// pickers.
func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup {
w := newWRR()
for _, ps := range readyWeightedPickers {
w.Add(ps.state.Picker, int64(ps.weight))
}
return &weightedPickerGroup{
w: w,
}
}
func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p, ok := pg.w.Next().(balancer.Picker)
if !ok {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
return p.Pick(info)
}