blob: 09729ec3e281811f2cef19e8499e32e2bb38a09d [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
package client
import (
"context"
"errors"
"fmt"
"sync"
"time"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
)
var (
m = make(map[version.TransportAPI]APIClientBuilder)
)
// RegisterAPIClientBuilder registers a client builder for xDS transport protocol
// version specified by b.Version().
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple builders are
// registered for the same version, the one registered last will take effect.
func RegisterAPIClientBuilder(b APIClientBuilder) {
m[b.Version()] = b
}
// getAPIClientBuilder returns the client builder registered for the provided
// xDS transport API version.
func getAPIClientBuilder(version version.TransportAPI) APIClientBuilder {
if b, ok := m[version]; ok {
return b
}
return nil
}
// BuildOptions contains options to be passed to client builders.
type BuildOptions struct {
// Parent is a top-level xDS client or server which has the intelligence to
// take appropriate action based on xDS responses received from the
// management server.
Parent UpdateHandler
// NodeProto contains the Node proto to be used in xDS requests. The actual
// type depends on the transport protocol version used.
NodeProto proto.Message
// Backoff returns the amount of time to backoff before retrying broken
// streams.
Backoff func(int) time.Duration
// Logger provides enhanced logging capabilities.
Logger *grpclog.PrefixLogger
}
// APIClientBuilder creates an xDS client for a specific xDS transport protocol
// version.
type APIClientBuilder interface {
// Build builds a transport protocol specific implementation of the xDS
// client based on the provided clientConn to the management server and the
// provided options.
Build(*grpc.ClientConn, BuildOptions) (APIClient, error)
// Version returns the xDS transport protocol version used by clients build
// using this builder.
Version() version.TransportAPI
}
// APIClient represents the functionality provided by transport protocol
// version specific implementations of the xDS client.
//
// TODO: unexport this interface and all the methods after the PR to make
// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for
// v2/v3 to override because they need to keep track of LDS name for RDS to use.
// After the share xdsClient change, that's no longer necessary. After that, we
// will still keep this interface for testing purposes.
type APIClient interface {
// AddWatch adds a watch for an xDS resource given its type and name.
AddWatch(ResourceType, string)
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
RemoveWatch(ResourceType, string)
// reportLoad starts an LRS stream to periodically report load using the
// provided ClientConn, which represent a connection to the management
// server.
reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions)
// Close cleans up resources allocated by the API client.
Close()
}
// loadReportingOptions contains configuration knobs for reporting load data.
type loadReportingOptions struct {
loadStore *load.Store
}
// UpdateHandler receives and processes (by taking appropriate actions) xDS
// resource updates from an APIClient for a specific version.
type UpdateHandler interface {
// NewListeners handles updates to xDS listener resources.
NewListeners(map[string]ListenerUpdate)
// NewRouteConfigs handles updates to xDS RouteConfiguration resources.
NewRouteConfigs(map[string]RouteConfigUpdate)
// NewClusters handles updates to xDS Cluster resources.
NewClusters(map[string]ClusterUpdate)
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
// referred to as Endpoints) resources.
NewEndpoints(map[string]EndpointsUpdate)
}
// ListenerUpdate contains information received in an LDS response, which is of
// interest to the registered LDS watcher.
type ListenerUpdate struct {
// RouteConfigName is the route configuration name corresponding to the
// target which is being watched through LDS.
RouteConfigName string
}
// RouteConfigUpdate contains information received in an RDS response, which is
// of interest to the registered RDS watcher.
type RouteConfigUpdate struct {
VirtualHosts []*VirtualHost
}
// VirtualHost contains the routes for a list of Domains.
//
// Note that the domains in this slice can be a wildcard, not an exact string.
// The consumer of this struct needs to find the best match for its hostname.
type VirtualHost struct {
Domains []string
// Routes contains a list of routes, each containing matchers and
// corresponding action.
Routes []*Route
}
// Route is both a specification of how to match a request as well as an
// indication of the action to take upon match.
type Route struct {
Path, Prefix, Regex *string
// Indicates if prefix/path matching should be case insensitive. The default
// is false (case sensitive).
CaseInsensitive bool
Headers []*HeaderMatcher
Fraction *uint32
Action map[string]uint32 // action is weighted clusters.
}
// HeaderMatcher represents header matchers.
type HeaderMatcher struct {
Name string `json:"name"`
InvertMatch *bool `json:"invertMatch,omitempty"`
ExactMatch *string `json:"exactMatch,omitempty"`
RegexMatch *string `json:"regexMatch,omitempty"`
PrefixMatch *string `json:"prefixMatch,omitempty"`
SuffixMatch *string `json:"suffixMatch,omitempty"`
RangeMatch *Int64Range `json:"rangeMatch,omitempty"`
PresentMatch *bool `json:"presentMatch,omitempty"`
}
// Int64Range is a range for header range match.
type Int64Range struct {
Start int64 `json:"start"`
End int64 `json:"end"`
}
// SecurityConfig contains the security configuration received as part of the
// Cluster resource.
type SecurityConfig struct {
// RootInstanceName identifies the certProvider plugin to be used to fetch
// root certificates. This instance name will be resolved to the plugin name
// and its associated configuration from the certificate_providers field of
// the bootstrap file.
RootInstanceName string
// RootCertName is the certificate name to be passed to the plugin (looked
// up from the bootstrap file) while fetching root certificates.
RootCertName string
// IdentityInstanceName identifies the certProvider plugin to be used to
// fetch identity certificates. This instance name will be resolved to the
// plugin name and its associated configuration from the
// certificate_providers field of the bootstrap file.
IdentityInstanceName string
// IdentityCertName is the certificate name to be passed to the plugin
// (looked up from the bootstrap file) while fetching identity certificates.
IdentityCertName string
// AcceptedSANs is a list of Subject Alternative Names. During the TLS
// handshake, the SAN present in the peer certificate is compared against
// this list, and the handshake succeeds only if a match is found.
AcceptedSANs []string
}
// ClusterUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type ClusterUpdate struct {
// ServiceName is the service name corresponding to the clusterName which
// is being watched for through CDS.
ServiceName string
// EnableLRS indicates whether or not load should be reported through LRS.
EnableLRS bool
// SecurityCfg contains security configuration sent by the xDS server.
SecurityCfg *SecurityConfig
}
// OverloadDropConfig contains the config to drop overloads.
type OverloadDropConfig struct {
Category string
Numerator uint32
Denominator uint32
}
// EndpointHealthStatus represents the health status of an endpoint.
type EndpointHealthStatus int32
const (
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
EndpointHealthStatusUnknown EndpointHealthStatus = iota
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
EndpointHealthStatusHealthy
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
EndpointHealthStatusUnhealthy
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
EndpointHealthStatusDraining
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
EndpointHealthStatusTimeout
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
EndpointHealthStatusDegraded
)
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
}
// Locality contains information of a locality.
type Locality struct {
Endpoints []Endpoint
ID internal.LocalityID
Priority uint32
Weight uint32
}
// EndpointsUpdate contains an EDS update.
type EndpointsUpdate struct {
Drops []OverloadDropConfig
Localities []Locality
}
// Function to be overridden in tests.
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
cb := getAPIClientBuilder(apiVersion)
if cb == nil {
return nil, fmt.Errorf("no client builder for xDS API version: %v", apiVersion)
}
return cb.Build(cc, opts)
}
// clientImpl is the real implementation of the xds client. The exported Client
// is a wrapper of this struct with a ref count.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type clientImpl struct {
done *grpcsync.Event
config *bootstrap.Config
cc *grpc.ClientConn // Connection to the management server.
apiClient APIClient
watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
updateCh *buffer.Unbounded // chan *watcherInfoWithUpdate
mu sync.Mutex
ldsWatchers map[string]map[*watchInfo]bool
ldsCache map[string]ListenerUpdate
rdsWatchers map[string]map[*watchInfo]bool
rdsCache map[string]RouteConfigUpdate
cdsWatchers map[string]map[*watchInfo]bool
cdsCache map[string]ClusterUpdate
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]EndpointsUpdate
// Changes to map lrsClients and the lrsClient inside the map need to be
// protected by lrsMu.
lrsMu sync.Mutex
lrsClients map[string]*lrsClient
}
// newWithConfig returns a new xdsClient with the given config.
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*clientImpl, error) {
switch {
case config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
switch config.TransportAPI {
case version.TransportV2:
if _, ok := config.NodeProto.(*v2corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
case version.TransportV3:
if _, ok := config.NodeProto.(*v3corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
}
dopts := []grpc.DialOption{
config.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
ldsCache: make(map[string]ListenerUpdate),
rdsWatchers: make(map[string]map[*watchInfo]bool),
rdsCache: make(map[string]RouteConfigUpdate),
cdsWatchers: make(map[string]map[*watchInfo]bool),
cdsCache: make(map[string]ClusterUpdate),
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]EndpointsUpdate),
lrsClients: make(map[string]*lrsClient),
}
cc, err := grpc.Dial(config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", config.BalancerName, err)
}
c.cc = cc
c.logger = prefixLogger((c))
c.logger.Infof("Created ClientConn to xDS management server: %s", config.BalancerName)
apiClient, err := newAPIClient(config.TransportAPI, cc, BuildOptions{
Parent: c,
NodeProto: config.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
Logger: c.logger,
})
if err != nil {
return nil, err
}
c.apiClient = apiClient
c.logger.Infof("Created")
go c.run()
return c, nil
}
// BootstrapConfig returns the configuration read from the bootstrap file.
// Callers must treat the return value as read-only.
func (c *Client) BootstrapConfig() *bootstrap.Config {
return c.config
}
// run is a goroutine for all the callbacks.
//
// Callback can be called in watch(), if an item is found in cache. Without this
// goroutine, the callback will be called inline, which might cause a deadlock
// in user's code. Callbacks also cannot be simple `go callback()` because the
// order matters.
func (c *clientImpl) run() {
for {
select {
case t := <-c.updateCh.Get():
c.updateCh.Load()
if c.done.HasFired() {
return
}
c.callCallback(t.(*watcherInfoWithUpdate))
case <-c.done.Done():
return
}
}
}
// Close closes the gRPC connection to the management server.
func (c *clientImpl) Close() {
if c.done.HasFired() {
return
}
c.done.Fire()
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.apiClient.Close()
c.cc.Close()
c.logger.Infof("Shutdown")
}
// ResourceType identifies resources in a transport protocol agnostic way. These
// will be used in transport version agnostic code, while the versioned API
// clients will map these to appropriate version URLs.
type ResourceType int
// Version agnostic resource type constants.
const (
UnknownResource ResourceType = iota
ListenerResource
HTTPConnManagerResource
RouteConfigResource
ClusterResource
EndpointsResource
)
func (r ResourceType) String() string {
switch r {
case ListenerResource:
return "ListenerResource"
case HTTPConnManagerResource:
return "HTTPConnManagerResource"
case RouteConfigResource:
return "RouteConfigResource"
case ClusterResource:
return "ClusterResource"
case EndpointsResource:
return "EndpointsResource"
default:
return "UnknownResource"
}
}
// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == version.V2ListenerURL || url == version.V3ListenerURL
}
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL
}
// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
// RouteConfig resource.
func IsRouteConfigResource(url string) bool {
return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL
}
// IsClusterResource returns true if the provider URL corresponds to an xDS
// Cluster resource.
func IsClusterResource(url string) bool {
return url == version.V2ClusterURL || url == version.V3ClusterURL
}
// IsEndpointsResource returns true if the provider URL corresponds to an xDS
// Endpoints resource.
func IsEndpointsResource(url string) bool {
return url == version.V2EndpointsURL || url == version.V3EndpointsURL
}