xds/federation: e2e tests (#5103)
diff --git a/internal/xds/bootstrap.go b/internal/xds/bootstrap.go
index eeb709c..4905b78 100644
--- a/internal/xds/bootstrap.go
+++ b/internal/xds/bootstrap.go
@@ -55,6 +55,14 @@
ServerListenerResourceNameTemplate string
// CertificateProviders is the certificate providers configuration.
CertificateProviders map[string]json.RawMessage
+ // Authorities is a list of non-default authorities.
+ //
+ // In the config, an authority contains {ServerURI, xds-version, creds,
+ // features, etc}. Note that this fields only has ServerURI (it's a
+ // map[authority-name]ServerURI). The other fields (version, creds,
+ // features) are assumed to be the same as the default authority (they can
+ // be added later if needed).
+ Authorities map[string]string
}
// SetupBootstrapFile creates a temporary file with bootstrap contents, based on
@@ -94,12 +102,8 @@
cfg := &bootstrapConfig{
XdsServers: []server{
{
- ServerURI: opts.ServerURI,
- ChannelCreds: []creds{
- {
- Type: "insecure",
- },
- },
+ ServerURI: opts.ServerURI,
+ ChannelCreds: []creds{{Type: "insecure"}},
},
},
Node: node{
@@ -117,6 +121,16 @@
return nil, fmt.Errorf("unsupported xDS transport protocol version: %v", opts.Version)
}
+ auths := make(map[string]authority)
+ for n, auURI := range opts.Authorities {
+ auths[n] = authority{XdsServers: []server{{
+ ServerURI: auURI,
+ ChannelCreds: []creds{{Type: "insecure"}},
+ ServerFeatures: cfg.XdsServers[0].ServerFeatures,
+ }}}
+ }
+ cfg.Authorities = auths
+
bootstrapContents, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
@@ -129,6 +143,11 @@
Node node `json:"node,omitempty"`
CertificateProviders map[string]json.RawMessage `json:"certificate_providers,omitempty"`
ServerListenerResourceNameTemplate string `json:"server_listener_resource_name_template,omitempty"`
+ Authorities map[string]authority `json:"authorities,omitempty"`
+}
+
+type authority struct {
+ XdsServers []server `json:"xds_servers,omitempty"`
}
type server struct {
diff --git a/xds/internal/test/xds_client_federation_test.go b/xds/internal/test/xds_client_federation_test.go
new file mode 100644
index 0000000..09db314
--- /dev/null
+++ b/xds/internal/test/xds_client_federation_test.go
@@ -0,0 +1,142 @@
+//go:build !386
+// +build !386
+
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package xds_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+ v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+ v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ "github.com/google/uuid"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/internal/envconfig"
+ xdsinternal "google.golang.org/grpc/internal/xds"
+ testpb "google.golang.org/grpc/test/grpc_testing"
+ "google.golang.org/grpc/xds"
+ "google.golang.org/grpc/xds/internal/testutils"
+ "google.golang.org/grpc/xds/internal/testutils/e2e"
+ "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
+)
+
+// TestClientSideFederation tests that federation is supported.
+//
+// In this test, some xDS responses contain resource names in another authority
+// (in the new resource name style):
+// - LDS: old style, no authority (default authority)
+// - RDS: new style, in a different authority
+// - CDS: old style, no authority (default authority)
+// - EDS: new style, in a different authority
+func (s) TestClientSideFederation(t *testing.T) {
+ oldXDSFederation := envconfig.XDSFederation
+ envconfig.XDSFederation = true
+ defer func() { envconfig.XDSFederation = oldXDSFederation }()
+
+ // Start a management server as the default authority.
+ serverDefaultAuth, err := e2e.StartManagementServer()
+ if err != nil {
+ t.Fatalf("Failed to spin up the xDS management server: %v", err)
+ }
+ t.Cleanup(serverDefaultAuth.Stop)
+
+ // Start another management server as the other authority.
+ const nonDefaultAuth = "non-default-auth"
+ serverAnotherAuth, err := e2e.StartManagementServer()
+ if err != nil {
+ t.Fatalf("Failed to spin up the xDS management server: %v", err)
+ }
+ t.Cleanup(serverAnotherAuth.Stop)
+
+ // Create a bootstrap file in a temporary directory.
+ nodeID := uuid.New().String()
+ bootstrapContents, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
+ Version: xdsinternal.TransportV3,
+ NodeID: nodeID,
+ ServerURI: serverDefaultAuth.Address,
+ ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
+ // Specify the address of the non-default authority.
+ Authorities: map[string]string{nonDefaultAuth: serverAnotherAuth.Address},
+ })
+ if err != nil {
+ t.Fatalf("Failed to create bootstrap file: %v", err)
+ }
+
+ resolver, err := xds.NewXDSResolverWithConfigForTesting(bootstrapContents)
+ if err != nil {
+ t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+ }
+ port, cleanup := clientSetup(t, &testService{})
+ defer cleanup()
+
+ const serviceName = "my-service-client-side-xds"
+ // LDS is old style name.
+ ldsName := serviceName
+ // RDS is new style, with the non default authority.
+ rdsName := testutils.BuildResourceName(xdsresource.RouteConfigResource, nonDefaultAuth, "route-"+serviceName, nil)
+ // CDS is old style name.
+ cdsName := "cluster-" + serviceName
+ // EDS is new style, with the non default authority.
+ edsName := testutils.BuildResourceName(xdsresource.EndpointsResource, nonDefaultAuth, "endpoints-"+serviceName, nil)
+
+ // Split resources, put LDS/CDS in the default authority, and put RDS/EDS in
+ // the other authority.
+ resourcesDefault := e2e.UpdateOptions{
+ NodeID: nodeID,
+ // This has only LDS and CDS.
+ Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
+ Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone)},
+ SkipValidation: true,
+ }
+ resourcesAnother := e2e.UpdateOptions{
+ NodeID: nodeID,
+ // This has only RDS and EDS.
+ Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, cdsName)},
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsName, "localhost", []uint32{port})},
+ SkipValidation: true,
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ // This has only LDS and CDS.
+ if err := serverDefaultAuth.Update(ctx, resourcesDefault); err != nil {
+ t.Fatal(err)
+ }
+ // This has only RDS and EDS.
+ if err := serverAnotherAuth.Update(ctx, resourcesAnother); err != nil {
+ t.Fatal(err)
+ }
+
+ // Create a ClientConn and make a successful RPC.
+ cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
+ if err != nil {
+ t.Fatalf("failed to dial local test server: %v", err)
+ }
+ defer cc.Close()
+
+ client := testpb.NewTestServiceClient(cc)
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
+ t.Fatalf("rpc EmptyCall() failed: %v", err)
+ }
+}
diff --git a/xds/internal/testutils/testutils.go b/xds/internal/testutils/testutils.go
index a4c56f6..bab5871 100644
--- a/xds/internal/testutils/testutils.go
+++ b/xds/internal/testutils/testutils.go
@@ -17,3 +17,31 @@
// Package testutils provides utility types, for use in xds tests.
package testutils
+
+import (
+ "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
+ "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
+)
+
+// BuildResourceName returns the resource name in the format of an xdstp://
+// resource.
+func BuildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams map[string]string) string {
+ var typS string
+ switch typ {
+ case xdsresource.ListenerResource:
+ typS = version.V3ListenerType
+ case xdsresource.RouteConfigResource:
+ typS = version.V3RouteConfigType
+ case xdsresource.ClusterResource:
+ typS = version.V3ClusterType
+ case xdsresource.EndpointsResource:
+ typS = version.V3EndpointsType
+ }
+ return (&xdsresource.Name{
+ Scheme: "xdstp",
+ Authority: auth,
+ Type: typS,
+ ID: id,
+ ContextParams: ctxParams,
+ }).String()
+}
diff --git a/xds/internal/xdsclient/authority_test.go b/xds/internal/xdsclient/authority_test.go
index 583594f..f55d076 100644
--- a/xds/internal/xdsclient/authority_test.go
+++ b/xds/internal/xdsclient/authority_test.go
@@ -150,7 +150,7 @@
}
t.Cleanup(client.Close)
- resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+ resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
ctrl, ok, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh)
if !ok {
t.Fatalf("want a new controller to be built, got none")
@@ -182,7 +182,7 @@
}
t.Cleanup(client.Close)
- resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+ resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
ctrl1, ok1, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh)
if !ok1 {
t.Fatalf("want a new controller to be built, got none")
@@ -195,7 +195,7 @@
// Call the watch with the same authority name. This shouldn't create a new
// controller.
- resourceNameSameAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+ resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh)
if ok2 {
t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
@@ -203,7 +203,7 @@
// Call the watch with a different authority name, but the same server
// config. This shouldn't create a new controller.
- resourceNameSameConfig := buildResourceName(xdsresource.ClusterResource, testAuthority2, testCDSName+"1", nil)
+ resourceNameSameConfig := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority2, testCDSName+"1", nil)
if ctrl, ok, _ := watchAndFetchNewController(t, client, resourceNameSameConfig, ctrlCh); ok {
t.Fatalf("an unexpected controller is built with config: %v", ctrl.config)
}
@@ -230,7 +230,7 @@
}
t.Cleanup(client.Close)
- resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+ resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh)
if !ok1 {
t.Fatalf("want a new controller to be built, got none")
@@ -239,7 +239,7 @@
var cancelWatch2 func()
// Call the watch with the same authority name. This shouldn't create a new
// controller.
- resourceNameSameAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+ resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
ctrl2, ok2, cancelWatch2 := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh)
if ok2 {
t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
@@ -285,7 +285,7 @@
t.Fatalf("want a new controller to be built, got none")
}
- resourceNameWithAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+ resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh)
if !ok2 {
t.Fatalf("want a new controller to be built, got none")
@@ -329,7 +329,7 @@
// Start a watch on the authority, and cancel it. This puts the authority in
// the idle cache.
- resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+ resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh)
if !ok1 {
t.Fatalf("want a new controller to be built, got none")
@@ -338,7 +338,7 @@
// Start another watch on this authority, it should retrieve the authority
// from the cache, instead of creating a new one.
- resourceNameWithAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+ resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh)
if ok2 {
t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
diff --git a/xds/internal/xdsclient/watchers_federation_test.go b/xds/internal/xdsclient/watchers_federation_test.go
index 1567cf5..527999e 100644
--- a/xds/internal/xdsclient/watchers_federation_test.go
+++ b/xds/internal/xdsclient/watchers_federation_test.go
@@ -22,6 +22,7 @@
"testing"
"google.golang.org/grpc/internal/envconfig"
+ "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@@ -35,8 +36,8 @@
overrideFedEnvVar(t)
var (
// Two resource names only differ in context parameter __order__.
- resourceName1 = buildResourceName(typ, testAuthority, "test-resource-name", nil) + "?a=1&b=2"
- resourceName2 = buildResourceName(typ, testAuthority, "test-resource-name", nil) + "?b=2&a=1"
+ resourceName1 = testutils.BuildResourceName(typ, testAuthority, "test-resource-name", nil) + "?a=1&b=2"
+ resourceName2 = testutils.BuildResourceName(typ, testAuthority, "test-resource-name", nil) + "?b=2&a=1"
)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
diff --git a/xds/internal/xdsclient/watchers_test.go b/xds/internal/xdsclient/watchers_test.go
index 39be83d..2405bd6 100644
--- a/xds/internal/xdsclient/watchers_test.go
+++ b/xds/internal/xdsclient/watchers_test.go
@@ -23,10 +23,10 @@
"testing"
"google.golang.org/grpc/internal/testutils"
+ xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
- "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/anypb"
)
@@ -221,34 +221,13 @@
return
}
-func buildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams map[string]string) string {
- var typS string
- switch typ {
- case xdsresource.ListenerResource:
- typS = version.V3ListenerType
- case xdsresource.RouteConfigResource:
- typS = version.V3RouteConfigType
- case xdsresource.ClusterResource:
- typS = version.V3ClusterType
- case xdsresource.EndpointsResource:
- typS = version.V3EndpointsType
- }
- return (&xdsresource.Name{
- Scheme: "xdstp",
- Authority: auth,
- Type: typS,
- ID: id,
- ContextParams: ctxParams,
- }).String()
-}
-
// TestClusterWatch covers the cases:
// - an update is received after a watch()
// - an update for another resource name
// - an update is received after cancel()
func testWatch(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
overrideFedEnvVar(t)
- for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+ for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
t.Run(rName, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@@ -302,7 +281,7 @@
// received after two watch() for the same resource name.
func testTwoWatchSameResourceName(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
overrideFedEnvVar(t)
- for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+ for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
t.Run(rName, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@@ -375,7 +354,7 @@
overrideFedEnvVar(t)
for _, rName := range [][]string{
{resourceName1, resourceName2},
- {buildResourceName(typ, testAuthority, resourceName1, nil), buildResourceName(typ, testAuthority, resourceName2, nil)},
+ {xdstestutils.BuildResourceName(typ, testAuthority, resourceName1, nil), xdstestutils.BuildResourceName(typ, testAuthority, resourceName2, nil)},
} {
t.Run(rName[0], func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -417,7 +396,7 @@
// in cache.
func testWatchAfterCache(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
overrideFedEnvVar(t)
- for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+ for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
t.Run(rName, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@@ -455,7 +434,7 @@
overrideFedEnvVar(t)
for _, rName := range [][]string{
{resourceName1, resourceName2},
- {buildResourceName(typ, testAuthority, resourceName1, nil), buildResourceName(typ, testAuthority, resourceName2, nil)},
+ {xdstestutils.BuildResourceName(typ, testAuthority, resourceName1, nil), xdstestutils.BuildResourceName(typ, testAuthority, resourceName2, nil)},
} {
t.Run(rName[0], func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -559,7 +538,7 @@
for _, rName := range [][]string{
{resourceName, badResourceName},
- {buildResourceName(typ, testAuthority, resourceName, nil), buildResourceName(typ, testAuthority, badResourceName, nil)},
+ {xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil), xdstestutils.BuildResourceName(typ, testAuthority, badResourceName, nil)},
} {
t.Run(rName[0], func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)