blob: 4af91c76498a04f812225e4414aa78e0f8b29f45 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package lrs implements load reporting balancer for xds.
package lrs
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
)
func init() {
balancer.Register(&lrsBB{})
}
const lrsBalancerName = "lrs_experimental"
type lrsBB struct{}
func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
}
b.loadStore = NewStore()
b.client = newXDSClientWrapper(b.loadStore)
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}
func (l *lrsBB) Name() string {
return lrsBalancerName
}
func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type lrsBalancer struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadStore Store
client *xdsClientWrapper
config *lbConfig
lb balancer.Balancer // The sub balancer.
}
func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if b.lb != nil {
b.lb.Close()
}
b.lb = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts)
}
// Update load reporting config or xds client.
b.client.update(newConfig, s.ResolverState.Attributes)
b.config = newConfig
// Addresses and sub-balancer config are sent to sub-balancer.
return b.lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
}
func (b *lrsBalancer) ResolverError(err error) {
if b.lb != nil {
b.lb.ResolverError(err)
}
}
func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.lb != nil {
b.lb.UpdateSubConnState(sc, s)
}
}
func (b *lrsBalancer) Close() {
if b.lb != nil {
b.lb.Close()
b.lb = nil
}
b.client.close()
}
type ccWrapper struct {
balancer.ClientConn
loadStore Store
localityID *internal.LocalityID
}
func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityID: localityID,
}
}
func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}
// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string, clusterName string, loadStore Store) (cancel func())
Close()
}
type xdsClientWrapper struct {
loadStore Store
c xdsClientInterface
cancelLoadReport func()
clusterName string
lrsServerName string
}
func newXDSClientWrapper(loadStore Store) *xdsClientWrapper {
return &xdsClientWrapper{
loadStore: loadStore,
}
}
// update checks the config and xdsclient, and decides whether it needs to
// restart the load reporting stream.
//
// TODO: refactor lrs to share one stream instead of one per EDS.
func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
var restartLoadReport bool
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
if w.c != clientFromAttr {
// xds client is different, restart.
restartLoadReport = true
w.c = clientFromAttr
}
}
}
// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
//
// TODO: LRS request actually has separate fields from these two values.
// Update lrs.Store to set both.
newClusterName := newConfig.EdsServiceName
if newClusterName == "" {
newClusterName = newConfig.ClusterName
}
if w.clusterName != newClusterName {
restartLoadReport = true
w.clusterName = newClusterName
}
if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
restartLoadReport = true
w.lrsServerName = newConfig.LrsLoadReportingServerName
}
if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
if w.c != nil {
w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore)
}
}
}
func (w *xdsClientWrapper) close() {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
}