xdsclient: populate error details for NACK (#3975)

diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go
index e2a7b13..0acfd69 100644
--- a/xds/internal/client/client_xds.go
+++ b/xds/internal/client/client_xds.go
@@ -47,7 +47,7 @@
 	update := make(map[string]ListenerUpdate)
 	for _, r := range resources {
 		if !IsListenerResource(r.GetTypeUrl()) {
-			return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
+			return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl())
 		}
 		lis := &v3listenerpb.Listener{}
 		if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
@@ -71,7 +71,7 @@
 	}
 	apiLisAny := lis.GetApiListener().GetApiListener()
 	if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
-		return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
+		return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
 	}
 	apiLis := &v3httppb.HttpConnectionManager{}
 	if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
@@ -108,7 +108,7 @@
 	update := make(map[string]RouteConfigUpdate)
 	for _, r := range resources {
 		if !IsRouteConfigResource(r.GetTypeUrl()) {
-			return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
+			return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl())
 		}
 		rc := &v3routepb.RouteConfiguration{}
 		if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
@@ -361,7 +361,7 @@
 	update := make(map[string]ClusterUpdate)
 	for _, r := range resources {
 		if !IsClusterResource(r.GetTypeUrl()) {
-			return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
+			return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl())
 		}
 
 		cluster := &v3clusterpb.Cluster{}
@@ -477,7 +477,7 @@
 	update := make(map[string]EndpointsUpdate)
 	for _, r := range resources {
 		if !IsEndpointsResource(r.GetTypeUrl()) {
-			return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
+			return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl())
 		}
 
 		cla := &v3endpointpb.ClusterLoadAssignment{}
diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go
index 3ce1f87..607f26f 100644
--- a/xds/internal/client/transport_helper.go
+++ b/xds/internal/client/transport_helper.go
@@ -51,7 +51,7 @@
 
 	// SendRequest constructs and sends out a DiscoveryRequest message specific
 	// to the underlying transport protocol version.
-	SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
+	SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error
 
 	// RecvResponse uses the provided stream to receive a response specific to
 	// the underlying transport protocol version.
@@ -246,10 +246,10 @@
 			t.sendCh.Load()
 
 			var (
-				target         []string
-				rType          ResourceType
-				version, nonce string
-				send           bool
+				target                 []string
+				rType                  ResourceType
+				version, nonce, errMsg string
+				send                   bool
 			)
 			switch update := u.(type) {
 			case *watchAction:
@@ -259,6 +259,7 @@
 				if !send {
 					continue
 				}
+				errMsg = update.errMsg
 			}
 			if stream == nil {
 				// There's no stream yet. Skip the request. This request
@@ -267,7 +268,7 @@
 				// sending response back).
 				continue
 			}
-			if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
+			if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
 				t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
 				// send failed, clear the current stream.
 				stream = nil
@@ -292,7 +293,7 @@
 	t.nonceMap = make(map[ResourceType]string)
 
 	for rType, s := range t.watchMap {
-		if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
+		if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
 			t.logger.Errorf("ADS request failed: %v", err)
 			return false
 		}
@@ -321,6 +322,7 @@
 				rType:   rType,
 				version: "",
 				nonce:   nonce,
+				errMsg:  err.Error(),
 				stream:  stream,
 			})
 			t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
@@ -387,6 +389,7 @@
 	rType   ResourceType
 	version string // NACK if version is an empty string.
 	nonce   string
+	errMsg  string // Empty unless it's a NACK.
 	// ACK/NACK are tagged with the stream it's for. When the stream is down,
 	// all the ACK/NACK for this stream will be dropped, and the version/nonce
 	// won't be updated.
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index 674bba4..7b063ad 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -25,6 +25,7 @@
 
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/internal/grpclog"
 	xdsclient "google.golang.org/grpc/xds/internal/client"
 	"google.golang.org/grpc/xds/internal/client/load"
@@ -33,6 +34,7 @@
 	v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
 	v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
 	v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+	statuspb "google.golang.org/genproto/googleapis/rpc/status"
 )
 
 func init() {
@@ -106,7 +108,7 @@
 // - If this is an ack, version will be the version from the response.
 // - If this is a nack, version will be the previous acked version (from
 //   versionMap). If there was no ack before, it will be empty.
-func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
+func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
 	stream, ok := s.(adsStream)
 	if !ok {
 		return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -117,7 +119,11 @@
 		ResourceNames: resourceNames,
 		VersionInfo:   version,
 		ResponseNonce: nonce,
-		// TODO: populate ErrorDetails for nack.
+	}
+	if errMsg != "" {
+		req.ErrorDetail = &statuspb.Status{
+			Code: int32(codes.InvalidArgument), Message: errMsg,
+		}
 	}
 	if err := stream.Send(req); err != nil {
 		return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
diff --git a/xds/internal/client/v2/client_ack_test.go b/xds/internal/client/v2/client_ack_test.go
index 2b38d3a..87437aa 100644
--- a/xds/internal/client/v2/client_ack_test.go
+++ b/xds/internal/client/v2/client_ack_test.go
@@ -29,6 +29,7 @@
 	anypb "github.com/golang/protobuf/ptypes/any"
 	"github.com/google/go-cmp/cmp"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/internal/testutils"
 	xdsclient "google.golang.org/grpc/xds/internal/client"
 	"google.golang.org/grpc/xds/internal/testutils/fakeserver"
@@ -73,7 +74,7 @@
 }
 
 // compareXDSRequest reads requests from channel, compare it with want.
-func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error {
+func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error {
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
 	val, err := ch.Receive(ctx)
@@ -84,11 +85,22 @@
 	if req.Err != nil {
 		return fmt.Errorf("unexpected error from request: %v", req.Err)
 	}
+
+	xdsReq := req.Req.(*xdspb.DiscoveryRequest)
+	if (xdsReq.ErrorDetail != nil) != wantErr {
+		return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr)
+	}
+	// All NACK request.ErrorDetails have hardcoded status code InvalidArguments.
+	if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) {
+		return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument)
+	}
+
+	xdsReq.ErrorDetail = nil // Clear the error details field before comparing.
 	wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
 	wantClone.VersionInfo = ver
 	wantClone.ResponseNonce = nonce
-	if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
-		return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
+	if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) {
+		return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal)))
 	}
 	return nil
 }
@@ -118,7 +130,7 @@
 	}
 	v2c.AddWatch(rType, nameToWatch)
 
-	if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
+	if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", rType, err)
 	}
 	t.Logf("FakeServer received %v request...", rType)
@@ -133,7 +145,7 @@
 	nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
 	t.Logf("Good %v response pushed to fakeServer...", rType)
 
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil {
 		return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
 	}
 	t.Logf("Good %v response acked", rType)
@@ -168,7 +180,7 @@
 		TypeUrl:   typeURL,
 	}, ver)
 	t.Logf("Bad %v response pushed to fakeServer...", rType)
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil {
 		return fmt.Errorf("failed to receive %v request: %v", rType, err)
 	}
 	t.Logf("Bad %v response nacked", rType)
@@ -274,7 +286,7 @@
 
 	// The expected version string is an empty string, because this is the first
 	// response, and it's nacked (so there's no previous ack version).
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil {
 		t.Errorf("Failed to receive request: %v", err)
 	}
 	t.Logf("Bad response nacked")
@@ -314,7 +326,7 @@
 	t.Logf("Bad response pushed to fakeServer...")
 
 	// The expected version string is the previous acked version.
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil {
 		t.Errorf("Failed to receive request: %v", err)
 	}
 	t.Logf("Bad response nacked")
@@ -339,7 +351,7 @@
 
 	// Start a CDS watch.
 	v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
 		t.Fatal(err)
 	}
 	t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -356,12 +368,12 @@
 	// Wait for a request with no resource names, because the only watch was
 	// removed.
 	emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
 	}
 	v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
 	// Wait for a request with correct resource names and version.
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
 	}
 	versionCDS++
@@ -394,7 +406,7 @@
 
 	// Start a CDS watch.
 	v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
 	}
 	t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -410,7 +422,7 @@
 	// Wait for a request with no resource names, because the only watch was
 	// removed.
 	emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
 	}
 	versionCDS++
@@ -440,7 +452,7 @@
 	// Start a new watch. The new watch should have the nonce from the response
 	// above, and version from the first good response.
 	v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
-	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
+	if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil {
 		t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
 	}
 
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index 328cd8b..5d8d719 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -24,7 +24,9 @@
 	"fmt"
 
 	"github.com/golang/protobuf/proto"
+	statuspb "google.golang.org/genproto/googleapis/rpc/status"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/internal/grpclog"
 	xdsclient "google.golang.org/grpc/xds/internal/client"
 	"google.golang.org/grpc/xds/internal/client/load"
@@ -106,7 +108,7 @@
 // - If this is an ack, version will be the version from the response.
 // - If this is a nack, version will be the previous acked version (from
 //   versionMap). If there was no ack before, it will be empty.
-func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
+func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
 	stream, ok := s.(adsStream)
 	if !ok {
 		return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -117,7 +119,11 @@
 		ResourceNames: resourceNames,
 		VersionInfo:   version,
 		ResponseNonce: nonce,
-		// TODO: populate ErrorDetails for nack.
+	}
+	if errMsg != "" {
+		req.ErrorDetail = &statuspb.Status{
+			Code: int32(codes.InvalidArgument), Message: errMsg,
+		}
 	}
 	if err := stream.Send(req); err != nil {
 		return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)