blob: ad5373a2f773e6d8929ca436eeaa0baedbb6dacb [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"encoding/json"
"fmt"
"regexp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
)
const xdsRoutingName = "xds_routing_experimental"
func init() {
balancer.Register(&routingBB{})
}
type routingBB struct{}
func (rbb *routingBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &routingBalancer{}
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
b.stateAggregator.start()
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
}
func (rbb *routingBB) Name() string {
return xdsRoutingName
}
func (rbb *routingBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type route struct {
action string
m *compositeMatcher
}
func (r route) String() string {
return r.m.String() + "->" + r.action
}
type routingBalancer struct {
logger *grpclog.PrefixLogger
// TODO: make this package not dependent on xds specific code. Same as for
// weighted target balancer.
bg *balancergroup.BalancerGroup
stateAggregator *balancerStateAggregator
actions map[string]actionConfig
routes []route
}
func (rb *routingBalancer) updateActions(s balancer.ClientConnState, newConfig *lbConfig) (needRebuild bool) {
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
var rebuildStateAndPicker bool
// Remove sub-pickers and sub-balancers that are not in the new action list.
for name := range rb.actions {
if _, ok := newConfig.actions[name]; !ok {
rb.stateAggregator.remove(name)
rb.bg.Remove(name)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
}
}
// For sub-balancers in the new action list,
// - add to balancer group if it's new,
// - forward the address/balancer config update.
for name, newT := range newConfig.actions {
if _, ok := rb.actions[name]; !ok {
// If this is a new sub-balancer, add weights to the picker map.
rb.stateAggregator.add(name)
// Then add to the balancer group.
rb.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
}
// Forwards all the update:
// - Addresses are from the map after splitting with hierarchy path,
// - Top level service config and attributes are the same,
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = rb.bg.UpdateClientConnState(name, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
},
BalancerConfig: newT.ChildPolicy.Config,
})
}
rb.actions = newConfig.actions
return rebuildStateAndPicker
}
func routeToMatcher(r routeConfig) (*compositeMatcher, error) {
var pathMatcher pathMatcherInterface
switch {
case r.regex != "":
re, err := regexp.Compile(r.regex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %q", r.regex)
}
pathMatcher = newPathRegexMatcher(re)
case r.path != "":
pathMatcher = newPathExactMatcher(r.path)
default:
pathMatcher = newPathPrefixMatcher(r.prefix)
}
var headerMatchers []headerMatcherInterface
for _, h := range r.headers {
var matcherT headerMatcherInterface
switch {
case h.exactMatch != "":
matcherT = newHeaderExactMatcher(h.name, h.exactMatch)
case h.regexMatch != "":
re, err := regexp.Compile(h.regexMatch)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %q, skipping this matcher", h.regexMatch)
}
matcherT = newHeaderRegexMatcher(h.name, re)
case h.prefixMatch != "":
matcherT = newHeaderPrefixMatcher(h.name, h.prefixMatch)
case h.suffixMatch != "":
matcherT = newHeaderSuffixMatcher(h.name, h.suffixMatch)
case h.rangeMatch != nil:
matcherT = newHeaderRangeMatcher(h.name, h.rangeMatch.start, h.rangeMatch.end)
default:
matcherT = newHeaderPresentMatcher(h.name, h.presentMatch)
}
if h.invertMatch {
matcherT = newInvertMatcher(matcherT)
}
headerMatchers = append(headerMatchers, matcherT)
}
var fractionMatcher *fractionMatcher
if r.fraction != nil {
fractionMatcher = newFractionMatcher(*r.fraction)
}
return newCompositeMatcher(pathMatcher, headerMatchers, fractionMatcher), nil
}
func routesEqual(a, b []route) bool {
if len(a) != len(b) {
return false
}
for i := range a {
aa := a[i]
bb := b[i]
if aa.action != bb.action {
return false
}
if !aa.m.equal(bb.m) {
return false
}
}
return true
}
func (rb *routingBalancer) updateRoutes(newConfig *lbConfig) (needRebuild bool, _ error) {
var newRoutes []route
for _, rt := range newConfig.routes {
newMatcher, err := routeToMatcher(rt)
if err != nil {
return false, err
}
newRoutes = append(newRoutes, route{action: rt.action, m: newMatcher})
}
rebuildStateAndPicker := !routesEqual(newRoutes, rb.routes)
rb.routes = newRoutes
if rebuildStateAndPicker {
rb.stateAggregator.updateRoutes(rb.routes)
}
return rebuildStateAndPicker, nil
}
func (rb *routingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
rb.logger.Infof("update with config %+v, resolver state %+v", s.BalancerConfig, s.ResolverState)
rebuildForActions := rb.updateActions(s, newConfig)
rebuildForRoutes, err := rb.updateRoutes(newConfig)
if err != nil {
return fmt.Errorf("xds_routing balancer: failed to update routes: %v", err)
}
if rebuildForActions || rebuildForRoutes {
rb.stateAggregator.buildAndUpdate()
}
return nil
}
func (rb *routingBalancer) ResolverError(err error) {
rb.bg.ResolverError(err)
}
func (rb *routingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
rb.bg.UpdateSubConnState(sc, state)
}
func (rb *routingBalancer) Close() {
rb.stateAggregator.close()
rb.bg.Close()
}