blob: 667e7aea3cfeda93ce06a4da9c8f8935ac61d7f4 [file] [log] [blame]
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package resolver implements the xds resolver, that does LDS and RDS to find
// the cluster to use.
package resolver
import (
"fmt"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
const xdsScheme = "xds"
// For overriding in unittests.
var (
newXDSClient = func() (xdsClientInterface, error) {
return xdsclient.New()
}
)
func init() {
resolver.Register(&xdsResolverBuilder{})
}
type xdsResolverBuilder struct{}
// Build helps implement the resolver.Builder interface.
//
// The xds bootstrap process is performed (and a new xds client is built) every
// time an xds resolver is built.
func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &xdsResolver{
target: t,
cc: cc,
closed: grpcsync.NewEvent(),
updateCh: make(chan suWithError, 1),
}
r.logger = prefixLogger((r))
r.logger.Infof("Creating resolver for target: %+v", t)
client, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
r.client = client
// Register a watch on the xdsClient for the user's dial target.
cancelWatch := watchService(r.client, r.target.Endpoint, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.target.Endpoint, r.client)
r.cancelWatch = func() {
cancelWatch()
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.target.Endpoint, r.client)
}
go r.run()
return r, nil
}
// Name helps implement the resolver.Builder interface.
func (*xdsResolverBuilder) Scheme() string {
return xdsScheme
}
// xdsClientInterface contains methods from xdsClient.Client which are used by
// the resolver. This will be faked out in unittests.
type xdsClientInterface interface {
WatchListener(serviceName string, cb func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(routeName string, cb func(xdsclient.RouteConfigUpdate, error)) func()
Close()
}
// suWithError wraps the ServiceUpdate and error received through a watch API
// callback, so that it can pushed onto the update channel as a single entity.
type suWithError struct {
su serviceUpdate
err error
}
// xdsResolver implements the resolver.Resolver interface.
//
// It registers a watcher for ServiceConfig updates with the xdsClient object
// (which performs LDS/RDS queries for the same), and passes the received
// updates to the ClientConn.
type xdsResolver struct {
target resolver.Target
cc resolver.ClientConn
closed *grpcsync.Event
logger *grpclog.PrefixLogger
// The underlying xdsClient which performs all xDS requests and responses.
client xdsClientInterface
// A channel for the watch API callback to write service updates on to. The
// updates are read by the run goroutine and passed on to the ClientConn.
updateCh chan suWithError
// cancelWatch is the function to cancel the watcher.
cancelWatch func()
// actions is a map from hash of weighted cluster, to the weighted cluster
// map, and it's assigned name. E.g.
// "A40_B60_": {{A:40, B:60}, "A_B_", "A_B_0"}
// "A30_B70_": {{A:30, B:70}, "A_B_", "A_B_1"}
// "B90_C10_": {{B:90, C:10}, "B_C_", "B_C_0"}
actions map[string]actionWithAssignedName
// usedActionNameRandomNumber contains random numbers that have been used in
// assigned names, to avoid collision.
usedActionNameRandomNumber map[int64]bool
}
// run is a long running goroutine which blocks on receiving service updates
// and passes it on the ClientConn.
func (r *xdsResolver) run() {
for {
select {
case <-r.closed.Done():
return
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.target.Endpoint, r.client, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS resource
// was removed. Send an empty service config, which picks
// pick-first, with no address, and puts the ClientConn into
// transient failure..
r.cc.UpdateState(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig("{}"),
})
continue
}
// Send error to ClientConn, and balancers, if error is not
// resource not found.
r.cc.ReportError(update.err)
continue
}
sc, err := r.serviceUpdateToJSON(update.su)
if err != nil {
r.logger.Warningf("failed to convert update to service config: %v", err)
r.cc.ReportError(err)
continue
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.target.Endpoint, r.client, sc)
r.cc.UpdateState(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(sc),
Attributes: attributes.New(xdsinternal.XDSClientID, r.client),
})
}
}
}
// handleServiceUpdate is the callback which handles service updates. It writes
// the received update to the update channel, which is picked by the run
// goroutine.
func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) {
if r.closed.HasFired() {
// Do not pass updates to the ClientConn once the resolver is closed.
return
}
r.updateCh <- suWithError{su, err}
}
// ResolveNow is a no-op at this point.
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
// Close closes the resolver, and also closes the underlying xdsClient.
func (r *xdsResolver) Close() {
r.cancelWatch()
r.client.Close()
r.closed.Fire()
r.logger.Infof("Shutdown")
}