diff --git a/internal/xds/env/env.go b/internal/xds/env/env.go
index 1110722..db9ac93 100644
--- a/internal/xds/env/env.go
+++ b/internal/xds/env/env.go
@@ -37,11 +37,13 @@
 	// and kept in variable BootstrapFileName.
 	//
 	// When both bootstrap FileName and FileContent are set, FileName is used.
-	BootstrapFileContentEnv      = "GRPC_XDS_BOOTSTRAP_CONFIG"
+	BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
+
 	circuitBreakingSupportEnv    = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
 	timeoutSupportEnv            = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
 	faultInjectionSupportEnv     = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
 	clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
+	aggregateAndDNSSupportEnv    = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
 
 	c2pResolverSupportEnv                    = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
 	c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
@@ -60,6 +62,7 @@
 	//
 	// When both bootstrap FileName and FileContent are set, FileName is used.
 	BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
+
 	// CircuitBreakingSupport indicates whether circuit breaking support is
 	// enabled, which can be disabled by setting the environment variable
 	// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
@@ -71,10 +74,6 @@
 	// FaultInjectionSupport is used to control both fault injection and HTTP
 	// filter support.
 	FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
-	// C2PResolverSupport indicates whether support for C2P resolver is enabled.
-	// This can be enabled by setting the environment variable
-	// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
-	C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
 	// ClientSideSecuritySupport is used to control processing of security
 	// configuration on the client-side.
 	//
@@ -82,6 +81,17 @@
 	// have a brand new API on the server-side and users explicitly need to use
 	// the new API to get security integration on the server.
 	ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
+	// AggregateAndDNSSupportEnv indicates whether processing of aggregated
+	// cluster and DNS cluster is enabled, which can be enabled by setting the
+	// environment variable
+	// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
+	// "true".
+	AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
+
+	// C2PResolverSupport indicates whether support for C2P resolver is enabled.
+	// This can be enabled by setting the environment variable
+	// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
+	C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
 	// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
 	C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
 )
diff --git a/interop/xds/client/Dockerfile b/interop/xds/client/Dockerfile
new file mode 100644
index 0000000..060f8a8
--- /dev/null
+++ b/interop/xds/client/Dockerfile
@@ -0,0 +1,34 @@
+# Copyright 2021 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for building the xDS interop client. To build the image, run the
+# following command from grpc-go directory:
+# docker build -t <TAG> -f interop/xds/client/Dockerfile .
+
+FROM golang:1.16-alpine as build
+
+# Make a grpc-go directory and copy the repo into it.
+WORKDIR /go/src/grpc-go
+COPY . .
+
+# Build a static binary without cgo so that we can copy just the binary in the
+# final image, and can get rid of Go compiler and gRPC-Go dependencies.
+RUN go build -tags osusergo,netgo interop/xds/client/client.go
+
+# Second stage of the build which copies over only the client binary and skips
+# the Go compiler and gRPC repo from the earlier stage. This significantly
+# reduces the docker image size.
+FROM alpine
+COPY --from=build /go/src/grpc-go/client .
+ENTRYPOINT ["./client"]
diff --git a/interop/xds/server/Dockerfile b/interop/xds/server/Dockerfile
new file mode 100644
index 0000000..259066e
--- /dev/null
+++ b/interop/xds/server/Dockerfile
@@ -0,0 +1,34 @@
+# Copyright 2021 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for building the xDS interop server. To build the image, run the
+# following command from grpc-go directory:
+# docker build -t <TAG> -f interop/xds/server/Dockerfile .
+
+FROM golang:1.16-alpine as build
+
+# Make a grpc-go directory and copy the repo into it.
+WORKDIR /go/src/grpc-go
+COPY . .
+
+# Build a static binary without cgo so that we can copy just the binary in the
+# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
+RUN go build -tags osusergo,netgo interop/xds/server/server.go
+
+# Second stage of the build which copies over only the client binary and skips
+# the Go compiler and gRPC repo from the earlier stage. This significantly
+# reduces the docker image size.
+FROM alpine
+COPY --from=build /go/src/grpc-go/server .
+ENTRYPOINT ["./server"]
diff --git a/interop/xds/server/server.go b/interop/xds/server/server.go
index 4989eb7..2f33799 100644
--- a/interop/xds/server/server.go
+++ b/interop/xds/server/server.go
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2020 gRPC authors.
+ * Copyright 2021 gRPC authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,29 +16,37 @@
  *
  */
 
-// Binary server for xDS interop tests.
+// Binary server is the server used for xDS interop tests.
 package main
 
 import (
 	"context"
 	"flag"
+	"fmt"
 	"log"
 	"net"
 	"os"
-	"strconv"
 
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/admin"
+	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/health"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/reflection"
+	"google.golang.org/grpc/xds"
 
+	xdscreds "google.golang.org/grpc/credentials/xds"
+	healthpb "google.golang.org/grpc/health/grpc_health_v1"
 	testgrpc "google.golang.org/grpc/interop/grpc_testing"
 	testpb "google.golang.org/grpc/interop/grpc_testing"
 )
 
 var (
-	port     = flag.Int("port", 8080, "The server port")
-	serverID = flag.String("server_id", "go_server", "Server ID included in response")
-	hostname = getHostname()
+	port            = flag.Int("port", 8080, "Listening port for test service")
+	maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
+	serverID        = flag.String("server_id", "go_server", "Server ID included in response")
+	secureMode      = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
 
 	logger = grpclog.Component("interop")
 )
@@ -51,28 +59,126 @@
 	return hostname
 }
 
-type server struct {
+// testServiceImpl provides an implementation of the TestService defined in
+// grpc.testing package.
+type testServiceImpl struct {
 	testgrpc.UnimplementedTestServiceServer
+	hostname string
 }
 
-func (s *server) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
-	grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
+func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+	grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
 	return &testpb.Empty{}, nil
 }
 
-func (s *server) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
-	grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
-	return &testpb.SimpleResponse{ServerId: *serverID, Hostname: hostname}, nil
+func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
+	grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
+	return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil
+}
+
+// xdsUpdateHealthServiceImpl provides an implementation of the
+// XdsUpdateHealthService defined in grpc.testing package.
+type xdsUpdateHealthServiceImpl struct {
+	testgrpc.UnimplementedXdsUpdateHealthServiceServer
+	healthServer *health.Server
+}
+
+func (x *xdsUpdateHealthServiceImpl) SetServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+	x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+	return &testpb.Empty{}, nil
+
+}
+
+func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+	x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
+	return &testpb.Empty{}, nil
+}
+
+func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
+	logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode)
+	if args.Err != nil {
+		logger.Infof("ServingModeCallback returned error: %v", args.Err)
+	}
 }
 
 func main() {
 	flag.Parse()
-	p := strconv.Itoa(*port)
-	lis, err := net.Listen("tcp", ":"+p)
-	if err != nil {
-		logger.Fatalf("failed to listen: %v", err)
+
+	if *secureMode && *port == *maintenancePort {
+		logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
 	}
-	s := grpc.NewServer()
-	testgrpc.RegisterTestServiceServer(s, &server{})
-	s.Serve(lis)
+
+	testService := &testServiceImpl{hostname: getHostname()}
+	healthServer := health.NewServer()
+	updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}
+
+	// If -secure_mode is not set, expose all services on -port with a regular
+	// gRPC server.
+	if !*secureMode {
+		lis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
+		if err != nil {
+			logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
+		}
+
+		server := grpc.NewServer()
+		testgrpc.RegisterTestServiceServer(server, testService)
+		healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+		healthpb.RegisterHealthServer(server, healthServer)
+		testgrpc.RegisterXdsUpdateHealthServiceServer(server, updateHealthService)
+		reflection.Register(server)
+		cleanup, err := admin.Register(server)
+		if err != nil {
+			logger.Fatalf("Failed to register admin services: %v", err)
+		}
+		defer cleanup()
+		if err := server.Serve(lis); err != nil {
+			logger.Errorf("Serve() failed: %v", err)
+		}
+		return
+	}
+
+	// Create a listener on -port to expose the test service.
+	testLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
+	if err != nil {
+		logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
+	}
+
+	// Create server-side xDS credentials with a plaintext fallback.
+	creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
+	if err != nil {
+		logger.Fatalf("Failed to create xDS credentials: %v", err)
+	}
+
+	// Create an xDS enabled gRPC server, register the test service
+	// implementation and start serving.
+	testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
+	testgrpc.RegisterTestServiceServer(testServer, testService)
+	go func() {
+		if err := testServer.Serve(testLis); err != nil {
+			logger.Errorf("test server Serve() failed: %v", err)
+		}
+	}()
+	defer testServer.Stop()
+
+	// Create a listener on -maintenance_port to expose other services.
+	maintenanceLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *maintenancePort))
+	if err != nil {
+		logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *maintenancePort), err)
+	}
+
+	// Create a regular gRPC server and register the maintenance services on
+	// it and start serving.
+	maintenanceServer := grpc.NewServer()
+	healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+	healthpb.RegisterHealthServer(maintenanceServer, healthServer)
+	testgrpc.RegisterXdsUpdateHealthServiceServer(maintenanceServer, updateHealthService)
+	reflection.Register(maintenanceServer)
+	cleanup, err := admin.Register(maintenanceServer)
+	if err != nil {
+		logger.Fatalf("Failed to register admin services: %v", err)
+	}
+	defer cleanup()
+	if err := maintenanceServer.Serve(maintenanceLis); err != nil {
+		logger.Errorf("maintenance server Serve() failed: %v", err)
+	}
 }
diff --git a/rpc_util.go b/rpc_util.go
index c8ae0e4..6db356f 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -429,9 +429,10 @@
 }
 func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
 
-// ForceCodec returns a CallOption that will set codec to be
-// used for all request and response messages for a call. The result of calling
-// Name() will be used as the content-subtype in a case-insensitive manner.
+// ForceCodec returns a CallOption that will set codec to be used for all
+// request and response messages for a call. The result of calling Name() will
+// be used as the content-subtype after converting to lowercase, unless
+// CallContentSubtype is also used.
 //
 // See Content-Type on
 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
@@ -853,7 +854,17 @@
 // setCallInfoCodec should only be called after CallOptions have been applied.
 func setCallInfoCodec(c *callInfo) error {
 	if c.codec != nil {
-		// codec was already set by a CallOption; use it.
+		// codec was already set by a CallOption; use it, but set the content
+		// subtype if it is not set.
+		if c.contentSubtype == "" {
+			// c.codec is a baseCodec to hide the difference between grpc.Codec and
+			// encoding.Codec (Name vs. String method name).  We only support
+			// setting content subtype from encoding.Codec to avoid a behavior
+			// change with the deprecated version.
+			if ec, ok := c.codec.(encoding.Codec); ok {
+				c.contentSubtype = strings.ToLower(ec.Name())
+			}
+		}
 		return nil
 	}
 
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 861a2f2..eb91d09 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -5293,7 +5293,7 @@
 	}
 	defer ss.Stop()
 
-	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
 
 	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
@@ -5305,6 +5305,55 @@
 	}
 }
 
+// renameProtoCodec is an encoding.Codec wrapper that allows customizing the
+// Name() of another codec.
+type renameProtoCodec struct {
+	encoding.Codec
+	name string
+}
+
+func (r *renameProtoCodec) Name() string { return r.name }
+
+// TestForceCodecName confirms that the ForceCodec call option sets the subtype
+// in the content-type header according to the Name() of the codec provided.
+func (s) TestForceCodecName(t *testing.T) {
+	wantContentTypeCh := make(chan []string, 1)
+	defer close(wantContentTypeCh)
+
+	ss := &stubserver.StubServer{
+		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
+			md, ok := metadata.FromIncomingContext(ctx)
+			if !ok {
+				return nil, status.Errorf(codes.Internal, "no metadata in context")
+			}
+			if got, want := md["content-type"], <-wantContentTypeCh; !reflect.DeepEqual(got, want) {
+				return nil, status.Errorf(codes.Internal, "got content-type=%q; want [%q]", got, want)
+			}
+			return &testpb.Empty{}, nil
+		},
+	}
+	if err := ss.Start([]grpc.ServerOption{grpc.ForceServerCodec(encoding.GetCodec("proto"))}); err != nil {
+		t.Fatalf("Error starting endpoint server: %v", err)
+	}
+	defer ss.Stop()
+
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+
+	codec := &renameProtoCodec{Codec: encoding.GetCodec("proto"), name: "some-test-name"}
+	wantContentTypeCh <- []string{"application/grpc+some-test-name"}
+	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.ForceCodec(codec)); err != nil {
+		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
+	}
+
+	// Confirm the name is converted to lowercase before transmitting.
+	codec.name = "aNoTHeRNaME"
+	wantContentTypeCh <- []string{"application/grpc+anothername"}
+	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.ForceCodec(codec)); err != nil {
+		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
+	}
+}
+
 func (s) TestForceServerCodec(t *testing.T) {
 	ss := &stubserver.StubServer{
 		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
@@ -5317,7 +5366,7 @@
 	}
 	defer ss.Stop()
 
-	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
 
 	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go
index 627229d..c7d9d28 100644
--- a/xds/internal/client/cds_test.go
+++ b/xds/internal/client/cds_test.go
@@ -123,6 +123,9 @@
 		},
 	}
 
+	oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
+	env.AggregateAndDNSSupportEnv = true
+	defer func() { env.AggregateAndDNSSupportEnv = oldAggregateAndDNSSupportEnv }()
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
 			if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil {
@@ -248,6 +251,9 @@
 	origCircuitBreakingSupport := env.CircuitBreakingSupport
 	env.CircuitBreakingSupport = true
 	defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
+	oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
+	env.AggregateAndDNSSupportEnv = true
+	defer func() { env.AggregateAndDNSSupportEnv = oldAggregateAndDNSSupportEnv }()
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
 			update, err := validateClusterAndConstructClusterUpdate(test.cluster)
diff --git a/xds/internal/client/xds.go b/xds/internal/client/xds.go
index 55bcc2e..0cd373d 100644
--- a/xds/internal/client/xds.go
+++ b/xds/internal/client/xds.go
@@ -595,6 +595,10 @@
 		return ClusterTypeEDS, cluster.GetEdsClusterConfig().GetServiceName(), nil, nil
 	}
 
+	if !env.AggregateAndDNSSupportEnv {
+		return 0, "", nil, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
+	}
+
 	if cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS {
 		return ClusterTypeLogicalDNS, cluster.GetName(), nil, nil
 	}
@@ -607,7 +611,7 @@
 		}
 		return ClusterTypeAggregate, cluster.GetName(), clusters.Clusters, nil
 	}
-	return 0, "", nil, fmt.Errorf("unexpected cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
+	return 0, "", nil, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
 }
 
 func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
diff --git a/xds/internal/testutils/e2e/clientresources.go b/xds/internal/testutils/e2e/clientresources.go
index b521db9..7c8311a 100644
--- a/xds/internal/testutils/e2e/clientresources.go
+++ b/xds/internal/testutils/e2e/clientresources.go
@@ -20,6 +20,8 @@
 
 import (
 	"fmt"
+	"net"
+	"strconv"
 
 	"github.com/envoyproxy/go-control-plane/pkg/wellknown"
 	"github.com/golang/protobuf/proto"
@@ -160,7 +162,7 @@
 		}
 	}
 	return &v3listenerpb.Listener{
-		Name: fmt.Sprintf(ServerListenerResourceNameTemplate, fmt.Sprintf("%s:%d", host, port)),
+		Name: fmt.Sprintf(ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port)))),
 		Address: &v3corepb.Address{
 			Address: &v3corepb.Address_SocketAddress{
 				SocketAddress: &v3corepb.SocketAddress{
