v1.38.x: backport (#4453)
* interop/xds: support xds security on interop server (#4444)
* interop/xds: dockerfile for the xds interop client (#4443)
* xds/cds: add env var for aggregated and DNS cluster (#4440)
* xds: use same format while registering and watching resources (#4422)
* client: fix ForceCodec to set content-type header appropriately (#4401)
diff --git a/internal/xds/env/env.go b/internal/xds/env/env.go
index 1110722..db9ac93 100644
--- a/internal/xds/env/env.go
+++ b/internal/xds/env/env.go
@@ -37,11 +37,13 @@
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
- BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
+ BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
+
circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
+ aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
@@ -60,6 +62,7 @@
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
+
// CircuitBreakingSupport indicates whether circuit breaking support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
@@ -71,10 +74,6 @@
// FaultInjectionSupport is used to control both fault injection and HTTP
// filter support.
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
- // C2PResolverSupport indicates whether support for C2P resolver is enabled.
- // This can be enabled by setting the environment variable
- // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
- C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
@@ -82,6 +81,17 @@
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
+ // AggregateAndDNSSupportEnv indicates whether processing of aggregated
+ // cluster and DNS cluster is enabled, which can be enabled by setting the
+ // environment variable
+ // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
+ // "true".
+ AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
+
+ // C2PResolverSupport indicates whether support for C2P resolver is enabled.
+ // This can be enabled by setting the environment variable
+ // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
+ C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)
diff --git a/interop/xds/client/Dockerfile b/interop/xds/client/Dockerfile
new file mode 100644
index 0000000..060f8a8
--- /dev/null
+++ b/interop/xds/client/Dockerfile
@@ -0,0 +1,34 @@
+# Copyright 2021 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for building the xDS interop client. To build the image, run the
+# following command from grpc-go directory:
+# docker build -t <TAG> -f interop/xds/client/Dockerfile .
+
+FROM golang:1.16-alpine as build
+
+# Make a grpc-go directory and copy the repo into it.
+WORKDIR /go/src/grpc-go
+COPY . .
+
+# Build a static binary without cgo so that we can copy just the binary in the
+# final image, and can get rid of Go compiler and gRPC-Go dependencies.
+RUN go build -tags osusergo,netgo interop/xds/client/client.go
+
+# Second stage of the build which copies over only the client binary and skips
+# the Go compiler and gRPC repo from the earlier stage. This significantly
+# reduces the docker image size.
+FROM alpine
+COPY --from=build /go/src/grpc-go/client .
+ENTRYPOINT ["./client"]
diff --git a/interop/xds/server/Dockerfile b/interop/xds/server/Dockerfile
new file mode 100644
index 0000000..259066e
--- /dev/null
+++ b/interop/xds/server/Dockerfile
@@ -0,0 +1,34 @@
+# Copyright 2021 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for building the xDS interop server. To build the image, run the
+# following command from grpc-go directory:
+# docker build -t <TAG> -f interop/xds/server/Dockerfile .
+
+FROM golang:1.16-alpine as build
+
+# Make a grpc-go directory and copy the repo into it.
+WORKDIR /go/src/grpc-go
+COPY . .
+
+# Build a static binary without cgo so that we can copy just the binary in the
+# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
+RUN go build -tags osusergo,netgo interop/xds/server/server.go
+
+# Second stage of the build which copies over only the client binary and skips
+# the Go compiler and gRPC repo from the earlier stage. This significantly
+# reduces the docker image size.
+FROM alpine
+COPY --from=build /go/src/grpc-go/server .
+ENTRYPOINT ["./server"]
diff --git a/interop/xds/server/server.go b/interop/xds/server/server.go
index 4989eb7..2f33799 100644
--- a/interop/xds/server/server.go
+++ b/interop/xds/server/server.go
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2020 gRPC authors.
+ * Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,29 +16,37 @@
*
*/
-// Binary server for xDS interop tests.
+// Binary server is the server used for xDS interop tests.
package main
import (
"context"
"flag"
+ "fmt"
"log"
"net"
"os"
- "strconv"
"google.golang.org/grpc"
+ "google.golang.org/grpc/admin"
+ "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/health"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/reflection"
+ "google.golang.org/grpc/xds"
+ xdscreds "google.golang.org/grpc/credentials/xds"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var (
- port = flag.Int("port", 8080, "The server port")
- serverID = flag.String("server_id", "go_server", "Server ID included in response")
- hostname = getHostname()
+ port = flag.Int("port", 8080, "Listening port for test service")
+ maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
+ serverID = flag.String("server_id", "go_server", "Server ID included in response")
+ secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
logger = grpclog.Component("interop")
)
@@ -51,28 +59,126 @@
return hostname
}
-type server struct {
+// testServiceImpl provides an implementation of the TestService defined in
+// grpc.testing package.
+type testServiceImpl struct {
testgrpc.UnimplementedTestServiceServer
+ hostname string
}
-func (s *server) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
- grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
+func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+ grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.Empty{}, nil
}
-func (s *server) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
- grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
- return &testpb.SimpleResponse{ServerId: *serverID, Hostname: hostname}, nil
+func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
+ grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
+ return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil
+}
+
+// xdsUpdateHealthServiceImpl provides an implementation of the
+// XdsUpdateHealthService defined in grpc.testing package.
+type xdsUpdateHealthServiceImpl struct {
+ testgrpc.UnimplementedXdsUpdateHealthServiceServer
+ healthServer *health.Server
+}
+
+func (x *xdsUpdateHealthServiceImpl) SetServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+ x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+ return &testpb.Empty{}, nil
+
+}
+
+func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
+ x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
+ return &testpb.Empty{}, nil
+}
+
+func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
+ logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode)
+ if args.Err != nil {
+ logger.Infof("ServingModeCallback returned error: %v", args.Err)
+ }
}
func main() {
flag.Parse()
- p := strconv.Itoa(*port)
- lis, err := net.Listen("tcp", ":"+p)
- if err != nil {
- logger.Fatalf("failed to listen: %v", err)
+
+ if *secureMode && *port == *maintenancePort {
+ logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
}
- s := grpc.NewServer()
- testgrpc.RegisterTestServiceServer(s, &server{})
- s.Serve(lis)
+
+ testService := &testServiceImpl{hostname: getHostname()}
+ healthServer := health.NewServer()
+ updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}
+
+ // If -secure_mode is not set, expose all services on -port with a regular
+ // gRPC server.
+ if !*secureMode {
+ lis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
+ if err != nil {
+ logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
+ }
+
+ server := grpc.NewServer()
+ testgrpc.RegisterTestServiceServer(server, testService)
+ healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+ healthpb.RegisterHealthServer(server, healthServer)
+ testgrpc.RegisterXdsUpdateHealthServiceServer(server, updateHealthService)
+ reflection.Register(server)
+ cleanup, err := admin.Register(server)
+ if err != nil {
+ logger.Fatalf("Failed to register admin services: %v", err)
+ }
+ defer cleanup()
+ if err := server.Serve(lis); err != nil {
+ logger.Errorf("Serve() failed: %v", err)
+ }
+ return
+ }
+
+ // Create a listener on -port to expose the test service.
+ testLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
+ if err != nil {
+ logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
+ }
+
+ // Create server-side xDS credentials with a plaintext fallback.
+ creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
+ if err != nil {
+ logger.Fatalf("Failed to create xDS credentials: %v", err)
+ }
+
+ // Create an xDS enabled gRPC server, register the test service
+ // implementation and start serving.
+ testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
+ testgrpc.RegisterTestServiceServer(testServer, testService)
+ go func() {
+ if err := testServer.Serve(testLis); err != nil {
+ logger.Errorf("test server Serve() failed: %v", err)
+ }
+ }()
+ defer testServer.Stop()
+
+ // Create a listener on -maintenance_port to expose other services.
+ maintenanceLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *maintenancePort))
+ if err != nil {
+ logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *maintenancePort), err)
+ }
+
+ // Create a regular gRPC server and register the maintenance services on
+ // it and start serving.
+ maintenanceServer := grpc.NewServer()
+ healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
+ healthpb.RegisterHealthServer(maintenanceServer, healthServer)
+ testgrpc.RegisterXdsUpdateHealthServiceServer(maintenanceServer, updateHealthService)
+ reflection.Register(maintenanceServer)
+ cleanup, err := admin.Register(maintenanceServer)
+ if err != nil {
+ logger.Fatalf("Failed to register admin services: %v", err)
+ }
+ defer cleanup()
+ if err := maintenanceServer.Serve(maintenanceLis); err != nil {
+ logger.Errorf("maintenance server Serve() failed: %v", err)
+ }
}
diff --git a/rpc_util.go b/rpc_util.go
index c8ae0e4..6db356f 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -429,9 +429,10 @@
}
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
-// ForceCodec returns a CallOption that will set codec to be
-// used for all request and response messages for a call. The result of calling
-// Name() will be used as the content-subtype in a case-insensitive manner.
+// ForceCodec returns a CallOption that will set codec to be used for all
+// request and response messages for a call. The result of calling Name() will
+// be used as the content-subtype after converting to lowercase, unless
+// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
@@ -853,7 +854,17 @@
// setCallInfoCodec should only be called after CallOptions have been applied.
func setCallInfoCodec(c *callInfo) error {
if c.codec != nil {
- // codec was already set by a CallOption; use it.
+ // codec was already set by a CallOption; use it, but set the content
+ // subtype if it is not set.
+ if c.contentSubtype == "" {
+ // c.codec is a baseCodec to hide the difference between grpc.Codec and
+ // encoding.Codec (Name vs. String method name). We only support
+ // setting content subtype from encoding.Codec to avoid a behavior
+ // change with the deprecated version.
+ if ec, ok := c.codec.(encoding.Codec); ok {
+ c.contentSubtype = strings.ToLower(ec.Name())
+ }
+ }
return nil
}
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 861a2f2..eb91d09 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -5293,7 +5293,7 @@
}
defer ss.Stop()
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
@@ -5305,6 +5305,55 @@
}
}
+// renameProtoCodec is an encoding.Codec wrapper that allows customizing the
+// Name() of another codec.
+type renameProtoCodec struct {
+ encoding.Codec
+ name string
+}
+
+func (r *renameProtoCodec) Name() string { return r.name }
+
+// TestForceCodecName confirms that the ForceCodec call option sets the subtype
+// in the content-type header according to the Name() of the codec provided.
+func (s) TestForceCodecName(t *testing.T) {
+ wantContentTypeCh := make(chan []string, 1)
+ defer close(wantContentTypeCh)
+
+ ss := &stubserver.StubServer{
+ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return nil, status.Errorf(codes.Internal, "no metadata in context")
+ }
+ if got, want := md["content-type"], <-wantContentTypeCh; !reflect.DeepEqual(got, want) {
+ return nil, status.Errorf(codes.Internal, "got content-type=%q; want [%q]", got, want)
+ }
+ return &testpb.Empty{}, nil
+ },
+ }
+ if err := ss.Start([]grpc.ServerOption{grpc.ForceServerCodec(encoding.GetCodec("proto"))}); err != nil {
+ t.Fatalf("Error starting endpoint server: %v", err)
+ }
+ defer ss.Stop()
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ codec := &renameProtoCodec{Codec: encoding.GetCodec("proto"), name: "some-test-name"}
+ wantContentTypeCh <- []string{"application/grpc+some-test-name"}
+ if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.ForceCodec(codec)); err != nil {
+ t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
+ }
+
+ // Confirm the name is converted to lowercase before transmitting.
+ codec.name = "aNoTHeRNaME"
+ wantContentTypeCh <- []string{"application/grpc+anothername"}
+ if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.ForceCodec(codec)); err != nil {
+ t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
+ }
+}
+
func (s) TestForceServerCodec(t *testing.T) {
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
@@ -5317,7 +5366,7 @@
}
defer ss.Stop()
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go
index 627229d..c7d9d28 100644
--- a/xds/internal/client/cds_test.go
+++ b/xds/internal/client/cds_test.go
@@ -123,6 +123,9 @@
},
}
+ oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
+ env.AggregateAndDNSSupportEnv = true
+ defer func() { env.AggregateAndDNSSupportEnv = oldAggregateAndDNSSupportEnv }()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil {
@@ -248,6 +251,9 @@
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
+ oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
+ env.AggregateAndDNSSupportEnv = true
+ defer func() { env.AggregateAndDNSSupportEnv = oldAggregateAndDNSSupportEnv }()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := validateClusterAndConstructClusterUpdate(test.cluster)
diff --git a/xds/internal/client/xds.go b/xds/internal/client/xds.go
index 55bcc2e..0cd373d 100644
--- a/xds/internal/client/xds.go
+++ b/xds/internal/client/xds.go
@@ -595,6 +595,10 @@
return ClusterTypeEDS, cluster.GetEdsClusterConfig().GetServiceName(), nil, nil
}
+ if !env.AggregateAndDNSSupportEnv {
+ return 0, "", nil, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
+ }
+
if cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS {
return ClusterTypeLogicalDNS, cluster.GetName(), nil, nil
}
@@ -607,7 +611,7 @@
}
return ClusterTypeAggregate, cluster.GetName(), clusters.Clusters, nil
}
- return 0, "", nil, fmt.Errorf("unexpected cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
+ return 0, "", nil, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
diff --git a/xds/internal/testutils/e2e/clientresources.go b/xds/internal/testutils/e2e/clientresources.go
index b521db9..7c8311a 100644
--- a/xds/internal/testutils/e2e/clientresources.go
+++ b/xds/internal/testutils/e2e/clientresources.go
@@ -20,6 +20,8 @@
import (
"fmt"
+ "net"
+ "strconv"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/proto"
@@ -160,7 +162,7 @@
}
}
return &v3listenerpb.Listener{
- Name: fmt.Sprintf(ServerListenerResourceNameTemplate, fmt.Sprintf("%s:%d", host, port)),
+ Name: fmt.Sprintf(ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port)))),
Address: &v3corepb.Address{
Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{