xds/federation: e2e tests (#5103)

diff --git a/internal/xds/bootstrap.go b/internal/xds/bootstrap.go
index eeb709c..4905b78 100644
--- a/internal/xds/bootstrap.go
+++ b/internal/xds/bootstrap.go
@@ -55,6 +55,14 @@
 	ServerListenerResourceNameTemplate string
 	// CertificateProviders is the certificate providers configuration.
 	CertificateProviders map[string]json.RawMessage
+	// Authorities is a list of non-default authorities.
+	//
+	// In the config, an authority contains {ServerURI, xds-version, creds,
+	// features, etc}. Note that this fields only has ServerURI (it's a
+	// map[authority-name]ServerURI). The other fields (version, creds,
+	// features) are assumed to be the same as the default authority (they can
+	// be added later if needed).
+	Authorities map[string]string
 }
 
 // SetupBootstrapFile creates a temporary file with bootstrap contents, based on
@@ -94,12 +102,8 @@
 	cfg := &bootstrapConfig{
 		XdsServers: []server{
 			{
-				ServerURI: opts.ServerURI,
-				ChannelCreds: []creds{
-					{
-						Type: "insecure",
-					},
-				},
+				ServerURI:    opts.ServerURI,
+				ChannelCreds: []creds{{Type: "insecure"}},
 			},
 		},
 		Node: node{
@@ -117,6 +121,16 @@
 		return nil, fmt.Errorf("unsupported xDS transport protocol version: %v", opts.Version)
 	}
 
+	auths := make(map[string]authority)
+	for n, auURI := range opts.Authorities {
+		auths[n] = authority{XdsServers: []server{{
+			ServerURI:      auURI,
+			ChannelCreds:   []creds{{Type: "insecure"}},
+			ServerFeatures: cfg.XdsServers[0].ServerFeatures,
+		}}}
+	}
+	cfg.Authorities = auths
+
 	bootstrapContents, err := json.MarshalIndent(cfg, "", "  ")
 	if err != nil {
 		return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
@@ -129,6 +143,11 @@
 	Node                               node                       `json:"node,omitempty"`
 	CertificateProviders               map[string]json.RawMessage `json:"certificate_providers,omitempty"`
 	ServerListenerResourceNameTemplate string                     `json:"server_listener_resource_name_template,omitempty"`
+	Authorities                        map[string]authority       `json:"authorities,omitempty"`
+}
+
+type authority struct {
+	XdsServers []server `json:"xds_servers,omitempty"`
 }
 
 type server struct {
diff --git a/xds/internal/test/xds_client_federation_test.go b/xds/internal/test/xds_client_federation_test.go
new file mode 100644
index 0000000..09db314
--- /dev/null
+++ b/xds/internal/test/xds_client_federation_test.go
@@ -0,0 +1,142 @@
+//go:build !386
+// +build !386
+
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package xds_test
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+	"github.com/google/uuid"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/internal/envconfig"
+	xdsinternal "google.golang.org/grpc/internal/xds"
+	testpb "google.golang.org/grpc/test/grpc_testing"
+	"google.golang.org/grpc/xds"
+	"google.golang.org/grpc/xds/internal/testutils"
+	"google.golang.org/grpc/xds/internal/testutils/e2e"
+	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
+)
+
+// TestClientSideFederation tests that federation is supported.
+//
+// In this test, some xDS responses contain resource names in another authority
+// (in the new resource name style):
+// - LDS: old style, no authority (default authority)
+// - RDS: new style, in a different authority
+// - CDS: old style, no authority (default authority)
+// - EDS: new style, in a different authority
+func (s) TestClientSideFederation(t *testing.T) {
+	oldXDSFederation := envconfig.XDSFederation
+	envconfig.XDSFederation = true
+	defer func() { envconfig.XDSFederation = oldXDSFederation }()
+
+	// Start a management server as the default authority.
+	serverDefaultAuth, err := e2e.StartManagementServer()
+	if err != nil {
+		t.Fatalf("Failed to spin up the xDS management server: %v", err)
+	}
+	t.Cleanup(serverDefaultAuth.Stop)
+
+	// Start another management server as the other authority.
+	const nonDefaultAuth = "non-default-auth"
+	serverAnotherAuth, err := e2e.StartManagementServer()
+	if err != nil {
+		t.Fatalf("Failed to spin up the xDS management server: %v", err)
+	}
+	t.Cleanup(serverAnotherAuth.Stop)
+
+	// Create a bootstrap file in a temporary directory.
+	nodeID := uuid.New().String()
+	bootstrapContents, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
+		Version:                            xdsinternal.TransportV3,
+		NodeID:                             nodeID,
+		ServerURI:                          serverDefaultAuth.Address,
+		ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
+		// Specify the address of the non-default authority.
+		Authorities: map[string]string{nonDefaultAuth: serverAnotherAuth.Address},
+	})
+	if err != nil {
+		t.Fatalf("Failed to create bootstrap file: %v", err)
+	}
+
+	resolver, err := xds.NewXDSResolverWithConfigForTesting(bootstrapContents)
+	if err != nil {
+		t.Fatalf("Failed to create xDS resolver for testing: %v", err)
+	}
+	port, cleanup := clientSetup(t, &testService{})
+	defer cleanup()
+
+	const serviceName = "my-service-client-side-xds"
+	// LDS is old style name.
+	ldsName := serviceName
+	// RDS is new style, with the non default authority.
+	rdsName := testutils.BuildResourceName(xdsresource.RouteConfigResource, nonDefaultAuth, "route-"+serviceName, nil)
+	// CDS is old style name.
+	cdsName := "cluster-" + serviceName
+	// EDS is new style, with the non default authority.
+	edsName := testutils.BuildResourceName(xdsresource.EndpointsResource, nonDefaultAuth, "endpoints-"+serviceName, nil)
+
+	// Split resources, put LDS/CDS in the default authority, and put RDS/EDS in
+	// the other authority.
+	resourcesDefault := e2e.UpdateOptions{
+		NodeID: nodeID,
+		// This has only LDS and CDS.
+		Listeners:      []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
+		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(cdsName, edsName, e2e.SecurityLevelNone)},
+		SkipValidation: true,
+	}
+	resourcesAnother := e2e.UpdateOptions{
+		NodeID: nodeID,
+		// This has only RDS and EDS.
+		Routes:         []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, cdsName)},
+		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsName, "localhost", []uint32{port})},
+		SkipValidation: true,
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	// This has only LDS and CDS.
+	if err := serverDefaultAuth.Update(ctx, resourcesDefault); err != nil {
+		t.Fatal(err)
+	}
+	// This has only RDS and EDS.
+	if err := serverAnotherAuth.Update(ctx, resourcesAnother); err != nil {
+		t.Fatal(err)
+	}
+
+	// Create a ClientConn and make a successful RPC.
+	cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
+	if err != nil {
+		t.Fatalf("failed to dial local test server: %v", err)
+	}
+	defer cc.Close()
+
+	client := testpb.NewTestServiceClient(cc)
+	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
+		t.Fatalf("rpc EmptyCall() failed: %v", err)
+	}
+}
diff --git a/xds/internal/testutils/testutils.go b/xds/internal/testutils/testutils.go
index a4c56f6..bab5871 100644
--- a/xds/internal/testutils/testutils.go
+++ b/xds/internal/testutils/testutils.go
@@ -17,3 +17,31 @@
 
 // Package testutils provides utility types, for use in xds tests.
 package testutils
+
+import (
+	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
+	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
+)
+
+// BuildResourceName returns the resource name in the format of an xdstp://
+// resource.
+func BuildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams map[string]string) string {
+	var typS string
+	switch typ {
+	case xdsresource.ListenerResource:
+		typS = version.V3ListenerType
+	case xdsresource.RouteConfigResource:
+		typS = version.V3RouteConfigType
+	case xdsresource.ClusterResource:
+		typS = version.V3ClusterType
+	case xdsresource.EndpointsResource:
+		typS = version.V3EndpointsType
+	}
+	return (&xdsresource.Name{
+		Scheme:        "xdstp",
+		Authority:     auth,
+		Type:          typS,
+		ID:            id,
+		ContextParams: ctxParams,
+	}).String()
+}
diff --git a/xds/internal/xdsclient/authority_test.go b/xds/internal/xdsclient/authority_test.go
index 583594f..f55d076 100644
--- a/xds/internal/xdsclient/authority_test.go
+++ b/xds/internal/xdsclient/authority_test.go
@@ -150,7 +150,7 @@
 	}
 	t.Cleanup(client.Close)
 
-	resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+	resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
 	ctrl, ok, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh)
 	if !ok {
 		t.Fatalf("want a new controller to be built, got none")
@@ -182,7 +182,7 @@
 	}
 	t.Cleanup(client.Close)
 
-	resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+	resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
 	ctrl1, ok1, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh)
 	if !ok1 {
 		t.Fatalf("want a new controller to be built, got none")
@@ -195,7 +195,7 @@
 
 	// Call the watch with the same authority name. This shouldn't create a new
 	// controller.
-	resourceNameSameAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+	resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
 	ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh)
 	if ok2 {
 		t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
@@ -203,7 +203,7 @@
 
 	// Call the watch with a different authority name, but the same server
 	// config. This shouldn't create a new controller.
-	resourceNameSameConfig := buildResourceName(xdsresource.ClusterResource, testAuthority2, testCDSName+"1", nil)
+	resourceNameSameConfig := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority2, testCDSName+"1", nil)
 	if ctrl, ok, _ := watchAndFetchNewController(t, client, resourceNameSameConfig, ctrlCh); ok {
 		t.Fatalf("an unexpected controller is built with config: %v", ctrl.config)
 	}
@@ -230,7 +230,7 @@
 	}
 	t.Cleanup(client.Close)
 
-	resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+	resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
 	ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh)
 	if !ok1 {
 		t.Fatalf("want a new controller to be built, got none")
@@ -239,7 +239,7 @@
 	var cancelWatch2 func()
 	// Call the watch with the same authority name. This shouldn't create a new
 	// controller.
-	resourceNameSameAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+	resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
 	ctrl2, ok2, cancelWatch2 := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh)
 	if ok2 {
 		t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
@@ -285,7 +285,7 @@
 		t.Fatalf("want a new controller to be built, got none")
 	}
 
-	resourceNameWithAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+	resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
 	ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh)
 	if !ok2 {
 		t.Fatalf("want a new controller to be built, got none")
@@ -329,7 +329,7 @@
 
 	// Start a watch on the authority, and cancel it. This puts the authority in
 	// the idle cache.
-	resourceName := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
+	resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil)
 	ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh)
 	if !ok1 {
 		t.Fatalf("want a new controller to be built, got none")
@@ -338,7 +338,7 @@
 
 	// Start another watch on this authority, it should retrieve the authority
 	// from the cache, instead of creating a new one.
-	resourceNameWithAuthority := buildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
+	resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil)
 	ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh)
 	if ok2 {
 		t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config)
diff --git a/xds/internal/xdsclient/watchers_federation_test.go b/xds/internal/xdsclient/watchers_federation_test.go
index 1567cf5..527999e 100644
--- a/xds/internal/xdsclient/watchers_federation_test.go
+++ b/xds/internal/xdsclient/watchers_federation_test.go
@@ -22,6 +22,7 @@
 	"testing"
 
 	"google.golang.org/grpc/internal/envconfig"
+	"google.golang.org/grpc/xds/internal/testutils"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
 )
 
@@ -35,8 +36,8 @@
 	overrideFedEnvVar(t)
 	var (
 		// Two resource names only differ in context parameter __order__.
-		resourceName1 = buildResourceName(typ, testAuthority, "test-resource-name", nil) + "?a=1&b=2"
-		resourceName2 = buildResourceName(typ, testAuthority, "test-resource-name", nil) + "?b=2&a=1"
+		resourceName1 = testutils.BuildResourceName(typ, testAuthority, "test-resource-name", nil) + "?a=1&b=2"
+		resourceName2 = testutils.BuildResourceName(typ, testAuthority, "test-resource-name", nil) + "?b=2&a=1"
 	)
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
diff --git a/xds/internal/xdsclient/watchers_test.go b/xds/internal/xdsclient/watchers_test.go
index 39be83d..2405bd6 100644
--- a/xds/internal/xdsclient/watchers_test.go
+++ b/xds/internal/xdsclient/watchers_test.go
@@ -23,10 +23,10 @@
 	"testing"
 
 	"google.golang.org/grpc/internal/testutils"
+	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
 	"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
 	"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
-	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
 	"google.golang.org/protobuf/types/known/anypb"
 )
 
@@ -221,34 +221,13 @@
 	return
 }
 
-func buildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams map[string]string) string {
-	var typS string
-	switch typ {
-	case xdsresource.ListenerResource:
-		typS = version.V3ListenerType
-	case xdsresource.RouteConfigResource:
-		typS = version.V3RouteConfigType
-	case xdsresource.ClusterResource:
-		typS = version.V3ClusterType
-	case xdsresource.EndpointsResource:
-		typS = version.V3EndpointsType
-	}
-	return (&xdsresource.Name{
-		Scheme:        "xdstp",
-		Authority:     auth,
-		Type:          typS,
-		ID:            id,
-		ContextParams: ctxParams,
-	}).String()
-}
-
 // TestClusterWatch covers the cases:
 // - an update is received after a watch()
 // - an update for another resource name
 // - an update is received after cancel()
 func testWatch(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
 	overrideFedEnvVar(t)
-	for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+	for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
 		t.Run(rName, func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 			defer cancel()
@@ -302,7 +281,7 @@
 // received after two watch() for the same resource name.
 func testTwoWatchSameResourceName(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
 	overrideFedEnvVar(t)
-	for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+	for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
 		t.Run(rName, func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 			defer cancel()
@@ -375,7 +354,7 @@
 	overrideFedEnvVar(t)
 	for _, rName := range [][]string{
 		{resourceName1, resourceName2},
-		{buildResourceName(typ, testAuthority, resourceName1, nil), buildResourceName(typ, testAuthority, resourceName2, nil)},
+		{xdstestutils.BuildResourceName(typ, testAuthority, resourceName1, nil), xdstestutils.BuildResourceName(typ, testAuthority, resourceName2, nil)},
 	} {
 		t.Run(rName[0], func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -417,7 +396,7 @@
 // in cache.
 func testWatchAfterCache(t *testing.T, typ xdsresource.ResourceType, update interface{}, resourceName string) {
 	overrideFedEnvVar(t)
-	for _, rName := range []string{resourceName, buildResourceName(typ, testAuthority, resourceName, nil)} {
+	for _, rName := range []string{resourceName, xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil)} {
 		t.Run(rName, func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 			defer cancel()
@@ -455,7 +434,7 @@
 	overrideFedEnvVar(t)
 	for _, rName := range [][]string{
 		{resourceName1, resourceName2},
-		{buildResourceName(typ, testAuthority, resourceName1, nil), buildResourceName(typ, testAuthority, resourceName2, nil)},
+		{xdstestutils.BuildResourceName(typ, testAuthority, resourceName1, nil), xdstestutils.BuildResourceName(typ, testAuthority, resourceName2, nil)},
 	} {
 		t.Run(rName[0], func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -559,7 +538,7 @@
 
 	for _, rName := range [][]string{
 		{resourceName, badResourceName},
-		{buildResourceName(typ, testAuthority, resourceName, nil), buildResourceName(typ, testAuthority, badResourceName, nil)},
+		{xdstestutils.BuildResourceName(typ, testAuthority, resourceName, nil), xdstestutils.BuildResourceName(typ, testAuthority, badResourceName, nil)},
 	} {
 		t.Run(rName[0], func(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)