| /* |
| * |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package v2 provides xDS v2 transport protocol specific functionality. |
| package v2 |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/internal/grpclog" |
| xdsclient "google.golang.org/grpc/xds/internal/client" |
| "google.golang.org/grpc/xds/internal/client/load" |
| "google.golang.org/grpc/xds/internal/version" |
| |
| v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" |
| v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" |
| v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" |
| ) |
| |
| func init() { |
| xdsclient.RegisterAPIClientBuilder(clientBuilder{}) |
| } |
| |
| var ( |
| resourceTypeToURL = map[xdsclient.ResourceType]string{ |
| xdsclient.ListenerResource: version.V2ListenerURL, |
| xdsclient.RouteConfigResource: version.V2RouteConfigURL, |
| xdsclient.ClusterResource: version.V2ClusterURL, |
| xdsclient.EndpointsResource: version.V2EndpointsURL, |
| } |
| ) |
| |
| type clientBuilder struct{} |
| |
| func (clientBuilder) Build(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { |
| return newClient(cc, opts) |
| } |
| |
| func (clientBuilder) Version() version.TransportAPI { |
| return version.TransportV2 |
| } |
| |
| func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIClient, error) { |
| nodeProto, ok := opts.NodeProto.(*v2corepb.Node) |
| if !ok { |
| return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, (*v2corepb.Node)(nil)) |
| } |
| v2c := &client{ |
| cc: cc, |
| parent: opts.Parent, |
| nodeProto: nodeProto, |
| loadStore: opts.LoadStore, |
| logger: opts.Logger, |
| } |
| v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) |
| v2c.TransportHelper = xdsclient.NewTransportHelper(v2c, opts.Logger, opts.Backoff) |
| return v2c, nil |
| } |
| |
| type adsStream v2adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient |
| |
| // client performs the actual xDS RPCs using the xDS v2 API. It creates a |
| // single ADS stream on which the different types of xDS requests and responses |
| // are multiplexed. |
| type client struct { |
| *xdsclient.TransportHelper |
| |
| ctx context.Context |
| cancelCtx context.CancelFunc |
| parent xdsclient.UpdateHandler |
| loadStore *load.Store |
| logger *grpclog.PrefixLogger |
| |
| // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. |
| cc *grpc.ClientConn |
| nodeProto *v2corepb.Node |
| |
| mu sync.Mutex |
| // ldsResourceName is the LDS resource_name to watch. It is set to the first |
| // LDS resource_name to watch, and removed when the LDS watch is canceled. |
| // |
| // It's from the dial target of the parent ClientConn. RDS resource |
| // processing needs this to do the host matching. |
| ldsResourceName string |
| ldsWatchCount int |
| |
| lastLoadReportAt time.Time |
| } |
| |
| // AddWatch overrides the transport helper's AddWatch to save the LDS |
| // resource_name. This is required when handling an RDS response to perform host |
| // matching. |
| func (v2c *client) AddWatch(rType xdsclient.ResourceType, rName string) { |
| v2c.mu.Lock() |
| // Special handling for LDS, because RDS needs the LDS resource_name for |
| // response host matching. |
| if rType == xdsclient.ListenerResource { |
| // Set hostname to the first LDS resource_name, and reset it when the |
| // last LDS watch is removed. The upper level Client isn't expected to |
| // watchLDS more than once. |
| v2c.ldsWatchCount++ |
| if v2c.ldsWatchCount == 1 { |
| v2c.ldsResourceName = rName |
| } |
| } |
| v2c.mu.Unlock() |
| v2c.TransportHelper.AddWatch(rType, rName) |
| } |
| |
| // RemoveWatch overrides the transport helper's RemoveWatch to clear the LDS |
| // resource_name when the last watch is removed. |
| func (v2c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) { |
| v2c.mu.Lock() |
| // Special handling for LDS, because RDS needs the LDS resource_name for |
| // response host matching. |
| if rType == xdsclient.ListenerResource { |
| // Set hostname to the first LDS resource_name, and reset it when the |
| // last LDS watch is removed. The upper level Client isn't expected to |
| // watchLDS more than once. |
| v2c.ldsWatchCount-- |
| if v2c.ldsWatchCount == 0 { |
| v2c.ldsResourceName = "" |
| } |
| } |
| v2c.mu.Unlock() |
| v2c.TransportHelper.RemoveWatch(rType, rName) |
| } |
| |
| func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { |
| return v2adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc).StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) |
| } |
| |
| // sendRequest sends out a DiscoveryRequest for the given resourceNames, of type |
| // rType, on the provided stream. |
| // |
| // version is the ack version to be sent with the request |
| // - If this is the new request (not an ack/nack), version will be empty. |
| // - If this is an ack, version will be the version from the response. |
| // - If this is a nack, version will be the previous acked version (from |
| // versionMap). If there was no ack before, it will be empty. |
| func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { |
| stream, ok := s.(adsStream) |
| if !ok { |
| return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) |
| } |
| req := &v2xdspb.DiscoveryRequest{ |
| Node: v2c.nodeProto, |
| TypeUrl: resourceTypeToURL[rType], |
| ResourceNames: resourceNames, |
| VersionInfo: version, |
| ResponseNonce: nonce, |
| // TODO: populate ErrorDetails for nack. |
| } |
| if err := stream.Send(req); err != nil { |
| return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) |
| } |
| v2c.logger.Debugf("ADS request sent: %v", req) |
| return nil |
| } |
| |
| // RecvResponse blocks on the receipt of one response message on the provided |
| // stream. |
| func (v2c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) { |
| stream, ok := s.(adsStream) |
| if !ok { |
| return nil, fmt.Errorf("xds: Attempt to receive response on unsupported stream type: %T", s) |
| } |
| |
| resp, err := stream.Recv() |
| if err != nil { |
| // TODO: call watch callbacks with error when stream is broken. |
| return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err) |
| } |
| v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) |
| v2c.logger.Debugf("ADS response received: %v", resp) |
| return resp, nil |
| } |
| |
| func (v2c *client) HandleResponse(r proto.Message) (xdsclient.ResourceType, string, string, error) { |
| rType := xdsclient.UnknownResource |
| resp, ok := r.(*v2xdspb.DiscoveryResponse) |
| if !ok { |
| return rType, "", "", fmt.Errorf("xds: unsupported message type: %T", resp) |
| } |
| |
| // Note that the xDS transport protocol is versioned independently of |
| // the resource types, and it is supported to transfer older versions |
| // of resource types using new versions of the transport protocol, or |
| // vice-versa. Hence we need to handle v3 type_urls as well here. |
| var err error |
| url := resp.GetTypeUrl() |
| switch { |
| case xdsclient.IsListenerResource(url): |
| err = v2c.handleLDSResponse(resp) |
| rType = xdsclient.ListenerResource |
| case xdsclient.IsRouteConfigResource(url): |
| err = v2c.handleRDSResponse(resp) |
| rType = xdsclient.RouteConfigResource |
| case xdsclient.IsClusterResource(url): |
| err = v2c.handleCDSResponse(resp) |
| rType = xdsclient.ClusterResource |
| case xdsclient.IsEndpointsResource(url): |
| err = v2c.handleEDSResponse(resp) |
| rType = xdsclient.EndpointsResource |
| default: |
| return rType, "", "", xdsclient.ErrResourceTypeUnsupported{ |
| ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()), |
| } |
| } |
| return rType, resp.GetVersionInfo(), resp.GetNonce(), err |
| } |
| |
| // handleLDSResponse processes an LDS response received from the xDS server. On |
| // receipt of a good response, it also invokes the registered watcher callback. |
| func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error { |
| update, err := xdsclient.UnmarshalListener(resp.GetResources(), v2c.logger) |
| if err != nil { |
| return err |
| } |
| v2c.parent.NewListeners(update) |
| return nil |
| } |
| |
| // handleRDSResponse processes an RDS response received from the xDS server. On |
| // receipt of a good response, it caches validated resources and also invokes |
| // the registered watcher callback. |
| func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error { |
| v2c.mu.Lock() |
| hostname := v2c.ldsResourceName |
| v2c.mu.Unlock() |
| |
| update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), hostname, v2c.logger) |
| if err != nil { |
| return err |
| } |
| v2c.parent.NewRouteConfigs(update) |
| return nil |
| } |
| |
| // handleCDSResponse processes an CDS response received from the xDS server. On |
| // receipt of a good response, it also invokes the registered watcher callback. |
| func (v2c *client) handleCDSResponse(resp *v2xdspb.DiscoveryResponse) error { |
| update, err := xdsclient.UnmarshalCluster(resp.GetResources(), v2c.logger) |
| if err != nil { |
| return err |
| } |
| v2c.parent.NewClusters(update) |
| return nil |
| } |
| |
| func (v2c *client) handleEDSResponse(resp *v2xdspb.DiscoveryResponse) error { |
| update, err := xdsclient.UnmarshalEndpoints(resp.GetResources(), v2c.logger) |
| if err != nil { |
| return err |
| } |
| v2c.parent.NewEndpoints(update) |
| return nil |
| } |