| /* |
| * |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package edsbalancer contains EDS balancer implementation. |
| package edsbalancer |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| |
| "github.com/google/go-cmp/cmp" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/roundrobin" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/internal/buffer" |
| "google.golang.org/grpc/internal/grpclog" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/serviceconfig" |
| xdsclient "google.golang.org/grpc/xds/internal/client" |
| ) |
| |
| const ( |
| edsName = "eds_experimental" |
| ) |
| |
| var ( |
| newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), xdsClient *xdsClientWrapper, logger *grpclog.PrefixLogger) edsBalancerImplInterface { |
| return newEDSBalancerImpl(cc, enqueueState, xdsClient, logger) |
| } |
| ) |
| |
| func init() { |
| balancer.Register(&edsBalancerBuilder{}) |
| } |
| |
| type edsBalancerBuilder struct{} |
| |
| // Build helps implement the balancer.Builder interface. |
| func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { |
| x := &edsBalancer{ |
| cc: cc, |
| closed: grpcsync.NewEvent(), |
| grpcUpdate: make(chan interface{}), |
| xdsClientUpdate: make(chan *edsUpdate), |
| childPolicyUpdate: buffer.NewUnbounded(), |
| } |
| x.logger = prefixLogger((x)) |
| x.client = newXDSClientWrapper(x.handleEDSUpdate, x.logger) |
| x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.client, x.logger) |
| x.logger.Infof("Created") |
| go x.run() |
| return x |
| } |
| |
| func (b *edsBalancerBuilder) Name() string { |
| return edsName |
| } |
| |
| func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| var cfg EDSConfig |
| if err := json.Unmarshal(c, &cfg); err != nil { |
| return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err) |
| } |
| return &cfg, nil |
| } |
| |
| // edsBalancerImplInterface defines the interface that edsBalancerImpl must |
| // implement to communicate with edsBalancer. |
| // |
| // It's implemented by the real eds balancer and a fake testing eds balancer. |
| type edsBalancerImplInterface interface { |
| // handleEDSResponse passes the received EDS message from traffic director to eds balancer. |
| handleEDSResponse(edsResp xdsclient.EndpointsUpdate) |
| // handleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. |
| handleChildPolicy(name string, config json.RawMessage) |
| // handleSubConnStateChange handles state change for SubConn. |
| handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) |
| // updateState handle a balancer state update from the priority. |
| updateState(priority priorityType, s balancer.State) |
| // close closes the eds balancer. |
| close() |
| } |
| |
| // edsBalancer manages xdsClient and the actual EDS balancer implementation that |
| // does load balancing. |
| // |
| // It currently has only an edsBalancer. Later, we may add fallback. |
| type edsBalancer struct { |
| cc balancer.ClientConn // *xdsClientConn |
| closed *grpcsync.Event |
| logger *grpclog.PrefixLogger |
| |
| // edsBalancer continuously monitor the channels below, and will handle events from them in sync. |
| grpcUpdate chan interface{} |
| xdsClientUpdate chan *edsUpdate |
| childPolicyUpdate *buffer.Unbounded |
| |
| client *xdsClientWrapper // may change when passed a different service config |
| config *EDSConfig // may change when passed a different service config |
| edsImpl edsBalancerImplInterface |
| } |
| |
| // run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc, |
| // xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It |
| // exits when edsBalancer is closed. |
| func (x *edsBalancer) run() { |
| for { |
| select { |
| case update := <-x.grpcUpdate: |
| x.handleGRPCUpdate(update) |
| case update := <-x.xdsClientUpdate: |
| x.handleXDSClientUpdate(update) |
| case update := <-x.childPolicyUpdate.Get(): |
| x.childPolicyUpdate.Load() |
| u := update.(*balancerStateWithPriority) |
| x.edsImpl.updateState(u.priority, u.s) |
| case <-x.closed.Done(): |
| x.client.close() |
| x.edsImpl.close() |
| return |
| } |
| } |
| } |
| |
| // handleErrorFromUpdate handles both the error from parent ClientConn (from CDS |
| // balancer) and the error from xds client (from the watcher). fromParent is |
| // true if error is from parent ClientConn. |
| // |
| // If the error is connection error, it should be handled for fallback purposes. |
| // |
| // If the error is resource-not-found: |
| // - If it's from CDS balancer (shows as a resolver error), it means LDS or CDS |
| // resources were removed. The EDS watch should be canceled. |
| // - If it's from xds client, it means EDS resource were removed. The EDS |
| // watcher should keep watching. |
| // In both cases, the sub-balancers will be closed, and the future picks will |
| // fail. |
| func (x *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) { |
| x.logger.Warningf("Received error: %v", err) |
| if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound { |
| if fromParent { |
| // This is an error from the parent ClientConn (can be the parent |
| // CDS balancer), and is a resource-not-found error. This means the |
| // resource (can be either LDS or CDS) was removed. Stop the EDS |
| // watch. |
| x.client.cancelWatch() |
| } |
| x.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{}) |
| } |
| } |
| |
| func (x *edsBalancer) handleGRPCUpdate(update interface{}) { |
| switch u := update.(type) { |
| case *subConnStateUpdate: |
| x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState) |
| case *balancer.ClientConnState: |
| x.logger.Infof("Receive update from resolver, balancer config: %+v", u.BalancerConfig) |
| cfg, _ := u.BalancerConfig.(*EDSConfig) |
| if cfg == nil { |
| // service config parsing failed. should never happen. |
| return |
| } |
| |
| if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil { |
| x.logger.Warningf("failed to update xds clients: %v", err) |
| } |
| |
| if x.config == nil { |
| x.config = cfg |
| return |
| } |
| |
| // We will update the edsImpl with the new child policy, if we got a |
| // different one. |
| if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) { |
| if cfg.ChildPolicy != nil { |
| x.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) |
| } else { |
| x.edsImpl.handleChildPolicy(roundrobin.Name, nil) |
| } |
| } |
| |
| x.config = cfg |
| case error: |
| x.handleErrorFromUpdate(u, true) |
| default: |
| // unreachable path |
| x.logger.Errorf("wrong update type: %T", update) |
| } |
| } |
| |
| func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) { |
| if err := update.err; err != nil { |
| x.handleErrorFromUpdate(err, false) |
| return |
| } |
| x.edsImpl.handleEDSResponse(update.resp) |
| } |
| |
| type subConnStateUpdate struct { |
| sc balancer.SubConn |
| state balancer.SubConnState |
| } |
| |
| func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { |
| update := &subConnStateUpdate{ |
| sc: sc, |
| state: state, |
| } |
| select { |
| case x.grpcUpdate <- update: |
| case <-x.closed.Done(): |
| } |
| } |
| |
| func (x *edsBalancer) ResolverError(err error) { |
| select { |
| case x.grpcUpdate <- err: |
| case <-x.closed.Done(): |
| } |
| } |
| |
| func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { |
| select { |
| case x.grpcUpdate <- &s: |
| case <-x.closed.Done(): |
| } |
| return nil |
| } |
| |
| type edsUpdate struct { |
| resp xdsclient.EndpointsUpdate |
| err error |
| } |
| |
| func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) { |
| select { |
| case x.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}: |
| case <-x.closed.Done(): |
| } |
| } |
| |
| type balancerStateWithPriority struct { |
| priority priorityType |
| s balancer.State |
| } |
| |
| func (x *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) { |
| x.childPolicyUpdate.Put(&balancerStateWithPriority{ |
| priority: p, |
| s: s, |
| }) |
| } |
| |
| func (x *edsBalancer) Close() { |
| x.closed.Fire() |
| x.logger.Infof("Shutdown") |
| } |