| /* |
| * |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edsbalancer |
| |
| import ( |
| "errors" |
| "fmt" |
| "time" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/base" |
| "google.golang.org/grpc/connectivity" |
| ) |
| |
| var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed") |
| |
| // handlePriorityChange handles priority after EDS adds/removes a |
| // priority. |
| // |
| // - If all priorities were deleted, unset priorityInUse, and set parent |
| // ClientConn to TransientFailure |
| // - If priorityInUse wasn't set, this is either the first EDS resp, or the |
| // previous EDS resp deleted everything. Set priorityInUse to 0, and start 0. |
| // - If priorityInUse was deleted, send the picker from the new lowest priority |
| // to parent ClientConn, and set priorityInUse to the new lowest. |
| // - If priorityInUse has a non-Ready state, and also there's a priority lower |
| // than priorityInUse (which means a lower priority was added), set the next |
| // priority as new priorityInUse, and start the bg. |
| func (edsImpl *edsBalancerImpl) handlePriorityChange() { |
| edsImpl.priorityMu.Lock() |
| defer edsImpl.priorityMu.Unlock() |
| |
| // Everything was removed by EDS. |
| if !edsImpl.priorityLowest.isSet() { |
| edsImpl.priorityInUse = newPriorityTypeUnset() |
| // Stop the init timer. This can happen if the only priority is removed |
| // shortly after it's added. |
| if timer := edsImpl.priorityInitTimer; timer != nil { |
| timer.Stop() |
| edsImpl.priorityInitTimer = nil |
| } |
| edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)}) |
| return |
| } |
| |
| // priorityInUse wasn't set, use 0. |
| if !edsImpl.priorityInUse.isSet() { |
| edsImpl.logger.Infof("Switching priority from unset to %v", 0) |
| edsImpl.startPriority(newPriorityType(0)) |
| return |
| } |
| |
| // priorityInUse was deleted, use the new lowest. |
| if _, ok := edsImpl.priorityToLocalities[edsImpl.priorityInUse]; !ok { |
| oldP := edsImpl.priorityInUse |
| edsImpl.priorityInUse = edsImpl.priorityLowest |
| edsImpl.logger.Infof("Switching priority from %v to %v, because former was deleted", oldP, edsImpl.priorityInUse) |
| if s, ok := edsImpl.priorityToState[edsImpl.priorityLowest]; ok { |
| edsImpl.cc.UpdateState(*s) |
| } else { |
| // If state for priorityLowest is not found, this means priorityLowest was |
| // started, but never sent any update. The init timer fired and |
| // triggered the next priority. The old_priorityInUse (that was just |
| // deleted EDS) was picked later. |
| // |
| // We don't have an old state to send to parent, but we also don't |
| // want parent to keep using picker from old_priorityInUse. Send an |
| // update to trigger block picks until a new picker is ready. |
| edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}) |
| } |
| return |
| } |
| |
| // priorityInUse is not ready, look for next priority, and use if found. |
| if s, ok := edsImpl.priorityToState[edsImpl.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready { |
| pNext := edsImpl.priorityInUse.nextLower() |
| if _, ok := edsImpl.priorityToLocalities[pNext]; ok { |
| edsImpl.logger.Infof("Switching priority from %v to %v, because latter was added, and former wasn't Ready") |
| edsImpl.startPriority(pNext) |
| } |
| } |
| } |
| |
| // startPriority sets priorityInUse to p, and starts the balancer group for p. |
| // It also starts a timer to fall to next priority after timeout. |
| // |
| // Caller must hold priorityMu, priority must exist, and edsImpl.priorityInUse |
| // must be non-nil. |
| func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) { |
| edsImpl.priorityInUse = priority |
| p := edsImpl.priorityToLocalities[priority] |
| // NOTE: this will eventually send addresses to sub-balancers. If the |
| // sub-balancer tries to update picker, it will result in a deadlock on |
| // priorityMu in the update is handled synchronously. The deadlock is |
| // currently avoided by handling balancer update in a goroutine (the run |
| // goroutine in the parent eds balancer). When priority balancer is split |
| // into its own, this asynchronous state handling needs to be copied. |
| p.stateAggregator.Start() |
| p.bg.Start() |
| // startPriority can be called when |
| // 1. first EDS resp, start p0 |
| // 2. a high priority goes Failure, start next |
| // 3. a high priority init timeout, start next |
| // |
| // In all the cases, the existing init timer is either closed, also already |
| // expired. There's no need to close the old timer. |
| edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() { |
| edsImpl.priorityMu.Lock() |
| defer edsImpl.priorityMu.Unlock() |
| if !edsImpl.priorityInUse.isSet() || !edsImpl.priorityInUse.equal(priority) { |
| return |
| } |
| edsImpl.priorityInitTimer = nil |
| pNext := priority.nextLower() |
| if _, ok := edsImpl.priorityToLocalities[pNext]; ok { |
| edsImpl.startPriority(pNext) |
| } |
| }) |
| } |
| |
| // handlePriorityWithNewState start/close priorities based on the connectivity |
| // state. It returns whether the state should be forwarded to parent ClientConn. |
| func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType, s balancer.State) bool { |
| edsImpl.priorityMu.Lock() |
| defer edsImpl.priorityMu.Unlock() |
| |
| if !edsImpl.priorityInUse.isSet() { |
| edsImpl.logger.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)") |
| return false |
| } |
| |
| if edsImpl.priorityInUse.higherThan(priority) { |
| // Lower priorities should all be closed, this is an unexpected update. |
| edsImpl.logger.Infof("eds: received picker update from priority lower then priorityInUse") |
| return false |
| } |
| |
| bState, ok := edsImpl.priorityToState[priority] |
| if !ok { |
| bState = &balancer.State{} |
| edsImpl.priorityToState[priority] = bState |
| } |
| oldState := bState.ConnectivityState |
| *bState = s |
| |
| switch s.ConnectivityState { |
| case connectivity.Ready: |
| return edsImpl.handlePriorityWithNewStateReady(priority) |
| case connectivity.TransientFailure: |
| return edsImpl.handlePriorityWithNewStateTransientFailure(priority) |
| case connectivity.Connecting: |
| return edsImpl.handlePriorityWithNewStateConnecting(priority, oldState) |
| default: |
| // New state is Idle, should never happen. Don't forward. |
| return false |
| } |
| } |
| |
| // handlePriorityWithNewStateReady handles state Ready and decides whether to |
| // forward update or not. |
| // |
| // An update with state Ready: |
| // - If it's from higher priority: |
| // - Forward the update |
| // - Set the priority as priorityInUse |
| // - Close all priorities lower than this one |
| // - If it's from priorityInUse: |
| // - Forward and do nothing else |
| // |
| // Caller must make sure priorityInUse is not higher than priority. |
| // |
| // Caller must hold priorityMu. |
| func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorityType) bool { |
| // If one priority higher or equal to priorityInUse goes Ready, stop the |
| // init timer. If update is from higher than priorityInUse, |
| // priorityInUse will be closed, and the init timer will become useless. |
| if timer := edsImpl.priorityInitTimer; timer != nil { |
| timer.Stop() |
| edsImpl.priorityInitTimer = nil |
| } |
| |
| if edsImpl.priorityInUse.lowerThan(priority) { |
| edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority) |
| edsImpl.priorityInUse = priority |
| for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() { |
| bgwc := edsImpl.priorityToLocalities[i] |
| bgwc.stateAggregator.Stop() |
| bgwc.bg.Close() |
| } |
| return true |
| } |
| return true |
| } |
| |
| // handlePriorityWithNewStateTransientFailure handles state TransientFailure and |
| // decides whether to forward update or not. |
| // |
| // An update with state Failure: |
| // - If it's from a higher priority: |
| // - Do not forward, and do nothing |
| // - If it's from priorityInUse: |
| // - If there's no lower: |
| // - Forward and do nothing else |
| // - If there's a lower priority: |
| // - Forward |
| // - Set lower as priorityInUse |
| // - Start lower |
| // |
| // Caller must make sure priorityInUse is not higher than priority. |
| // |
| // Caller must hold priorityMu. |
| func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateTransientFailure(priority priorityType) bool { |
| if edsImpl.priorityInUse.lowerThan(priority) { |
| return false |
| } |
| // priorityInUse sends a failure. Stop its init timer. |
| if timer := edsImpl.priorityInitTimer; timer != nil { |
| timer.Stop() |
| edsImpl.priorityInitTimer = nil |
| } |
| pNext := priority.nextLower() |
| if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext { |
| return true |
| } |
| edsImpl.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, pNext) |
| edsImpl.startPriority(pNext) |
| return true |
| } |
| |
| // handlePriorityWithNewStateConnecting handles state Connecting and decides |
| // whether to forward update or not. |
| // |
| // An update with state Connecting: |
| // - If it's from a higher priority |
| // - Do nothing |
| // - If it's from priorityInUse, the behavior depends on previous state. |
| // |
| // When new state is Connecting, the behavior depends on previous state. If the |
| // previous state was Ready, this is a transition out from Ready to Connecting. |
| // Assuming there are multiple backends in the same priority, this mean we are |
| // in a bad situation and we should failover to the next priority (Side note: |
| // the current connectivity state aggregating algorhtim (e.g. round-robin) is |
| // not handling this right, because if many backends all go from Ready to |
| // Connecting, the overall situation is more like TransientFailure, not |
| // Connecting). |
| // |
| // If the previous state was Idle, we don't do anything special with failure, |
| // and simply forward the update. The init timer should be in process, will |
| // handle failover if it timeouts. If the previous state was TransientFailure, |
| // we do not forward, because the lower priority is in use. |
| // |
| // Caller must make sure priorityInUse is not higher than priority. |
| // |
| // Caller must hold priorityMu. |
| func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool { |
| if edsImpl.priorityInUse.lowerThan(priority) { |
| return false |
| } |
| |
| switch oldState { |
| case connectivity.Ready: |
| pNext := priority.nextLower() |
| if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext { |
| return true |
| } |
| edsImpl.logger.Infof("Switching priority from %v to %v, because former became Connecting from Ready", priority, pNext) |
| edsImpl.startPriority(pNext) |
| return true |
| case connectivity.Idle: |
| return true |
| case connectivity.TransientFailure: |
| return false |
| default: |
| // Old state is Connecting or Shutdown. Don't forward. |
| return false |
| } |
| } |
| |
| // priorityType represents the priority from EDS response. |
| // |
| // 0 is the highest priority. The bigger the number, the lower the priority. |
| type priorityType struct { |
| set bool |
| p uint32 |
| } |
| |
| func newPriorityType(p uint32) priorityType { |
| return priorityType{ |
| set: true, |
| p: p, |
| } |
| } |
| |
| func newPriorityTypeUnset() priorityType { |
| return priorityType{} |
| } |
| |
| func (p priorityType) isSet() bool { |
| return p.set |
| } |
| |
| func (p priorityType) equal(p2 priorityType) bool { |
| if !p.isSet() && !p2.isSet() { |
| return true |
| } |
| if !p.isSet() || !p2.isSet() { |
| return false |
| } |
| return p == p2 |
| } |
| |
| func (p priorityType) higherThan(p2 priorityType) bool { |
| if !p.isSet() || !p2.isSet() { |
| // TODO(menghanl): return an appropriate value instead of panic. |
| panic("priority unset") |
| } |
| return p.p < p2.p |
| } |
| |
| func (p priorityType) lowerThan(p2 priorityType) bool { |
| if !p.isSet() || !p2.isSet() { |
| // TODO(menghanl): return an appropriate value instead of panic. |
| panic("priority unset") |
| } |
| return p.p > p2.p |
| } |
| |
| func (p priorityType) nextLower() priorityType { |
| if !p.isSet() { |
| panic("priority unset") |
| } |
| return priorityType{ |
| set: true, |
| p: p.p + 1, |
| } |
| } |
| |
| func (p priorityType) String() string { |
| if !p.set { |
| return "Nil" |
| } |
| return fmt.Sprint(p.p) |
| } |