blob: d83530a157bc5fb24996e7c82ea0e30e18cdde2a [file] [log] [blame]
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package csds_test
import (
"context"
"fmt"
"io"
"sort"
"strings"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/csds"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
)
const defaultTestTimeout = 5 * time.Second
var cmpOpts = cmp.Options{
cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...)
sort.Slice(out, func(i, j int) bool {
a, b := out[i], out[j]
if a == nil {
return true
}
if b == nil {
return false
}
if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
return strings.Compare(a.Name, b.Name) < 0
}
return strings.Compare(a.TypeUrl, b.TypeUrl) < 0
})
return out
}),
protocmp.Transform(),
protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"),
protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"),
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestCSDS(t *testing.T) {
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap file in a temporary directory.
bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{
Version: bootstrap.TransportV3,
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer bootstrapCleanup()
// Create an xDS client. This will end up using the same singleton as used
// by the CSDS service.
xdsC, close, err := xdsclient.New()
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()
// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := csds.NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
defer func() {
server.Stop()
csdss.Close()
}()
// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
// Create a client to the CSDS server.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
}
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("Failed to create a stream for CSDS: %v", err)
}
defer conn.Close()
// Verify that the xDS client reports an empty config.
if err := checkClientStatusResponse(stream, nil); err != nil {
t.Fatal(err)
}
// Initialize the xDS resources to be used in this test.
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
rdsTargets := []string{"route-config-0", "route-config-1"}
cdsTargets := []string{"cluster-0", "cluster-1"}
edsTargets := []string{"endpoints-0", "endpoints-1"}
listeners := make([]*v3listenerpb.Listener, len(ldsTargets))
listenerAnys := make([]*anypb.Any, len(ldsTargets))
for i := range ldsTargets {
listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
listenerAnys[i] = testutils.MarshalAny(listeners[i])
}
routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets))
routeAnys := make([]*anypb.Any, len(rdsTargets))
for i := range rdsTargets {
routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
routeAnys[i] = testutils.MarshalAny(routes[i])
}
clusters := make([]*v3clusterpb.Cluster, len(cdsTargets))
clusterAnys := make([]*anypb.Any, len(cdsTargets))
for i := range cdsTargets {
clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
clusterAnys[i] = testutils.MarshalAny(clusters[i])
}
endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
endpointAnys := make([]*anypb.Any, len(edsTargets))
ips := []string{"0.0.0.0", "1.1.1.1"}
ports := []uint32{123, 456}
for i := range edsTargets {
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
endpointAnys[i] = testutils.MarshalAny(endpoints[i])
}
// Register watches on the xDS client for two resources of each type.
for _, target := range ldsTargets {
xdsC.WatchListener(target, func(xdsresource.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
}
// Verify that the xDS client reports the resources as being in "Requested"
// state.
want := []*v3statuspb.ClientConfig_GenericXdsConfig{}
for i := range ldsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
}
for i := range rdsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
}
for i := range cdsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
}
for i := range edsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
}
for {
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err)
}
if err := checkClientStatusResponse(stream, want); err == nil {
break
}
time.Sleep(time.Millisecond * 100)
}
// Configure the management server with two resources of each type,
// corresponding to the watches registered above.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
}); err != nil {
t.Fatal(err)
}
// Verify that the xDS client reports the resources as being in "ACKed"
// state, and in version "1".
want = nil
for i := range ldsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]))
}
for i := range rdsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]))
}
for i := range cdsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]))
}
for i := range edsTargets {
want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]))
}
for {
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err)
}
err := checkClientStatusResponse(stream, want)
if err == nil {
break
}
time.Sleep(time.Millisecond * 100)
}
// Update the first resource of each type in the management server to a
// value which is expected to be NACK'ed by the xDS client.
const nackResourceIdx = 0
listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{}
routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}}
clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}
endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}}
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
SkipValidation: true,
}); err != nil {
t.Fatal(err)
}
// Verify that the xDS client reports the first resource of each type as
// being in "NACKed" state, and the second resource of each type to be in
// "ACKed" state. The version for the ACKed resource would be "2", while
// that for the NACKed resource would be "1". In the NACKed resource, the
// version which is NACKed is stored in the ErrorState field.
want = nil
for i := range ldsTargets {
config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])
if i == nackResourceIdx {
config.VersionInfo = "1"
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
}
want = append(want, config)
}
for i := range rdsTargets {
config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])
if i == nackResourceIdx {
config.VersionInfo = "1"
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
}
want = append(want, config)
}
for i := range cdsTargets {
config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])
if i == nackResourceIdx {
config.VersionInfo = "1"
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
}
want = append(want, config)
}
for i := range edsTargets {
config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])
if i == nackResourceIdx {
config.VersionInfo = "1"
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
}
want = append(want, config)
}
for {
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err)
}
err := checkClientStatusResponse(stream, want)
if err == nil {
break
}
time.Sleep(time.Millisecond * 100)
}
}
func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig {
return &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: version,
ClientStatus: status,
XdsConfig: config,
}
}
func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error {
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
if err != io.EOF {
return fmt.Errorf("failed to send ClientStatusRequest: %v", err)
}
// If the stream has closed, we call Recv() until it returns a non-nil
// error to get the actual error on the stream.
for {
if _, err := stream.Recv(); err != nil {
return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
}
}
}
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
}
if n := len(resp.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(resp))
}
if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
return nil
}
func (s) TestCSDSNoXDSClient(t *testing.T) {
// Create a bootstrap file in a temporary directory. Since we pass empty
// options, it would end up creating a bootstrap file with an empty
// serverURI which will fail xDS client creation.
bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { bootstrapCleanup() })
// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := csds.NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
defer csdss.Close()
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
defer server.Stop()
// Create a client to the CSDS server.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
}
defer conn.Close()
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("Failed to create a stream for CSDS: %v", err)
}
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
t.Fatalf("Failed to send ClientStatusRequest: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
t.Fatalf("Failed to recv ClientStatusResponse: %v", err)
}
if n := len(r.Config); n != 0 {
t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r))
}
}