xdsclient: populate error details for NACK (#3975)
diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go
index e2a7b13..0acfd69 100644
--- a/xds/internal/client/client_xds.go
+++ b/xds/internal/client/client_xds.go
@@ -47,7 +47,7 @@
update := make(map[string]ListenerUpdate)
for _, r := range resources {
if !IsListenerResource(r.GetTypeUrl()) {
- return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
+ return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
@@ -71,7 +71,7 @@
}
apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
- return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
+ return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
@@ -108,7 +108,7 @@
update := make(map[string]RouteConfigUpdate)
for _, r := range resources {
if !IsRouteConfigResource(r.GetTypeUrl()) {
- return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
+ return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl())
}
rc := &v3routepb.RouteConfiguration{}
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
@@ -361,7 +361,7 @@
update := make(map[string]ClusterUpdate)
for _, r := range resources {
if !IsClusterResource(r.GetTypeUrl()) {
- return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
+ return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl())
}
cluster := &v3clusterpb.Cluster{}
@@ -477,7 +477,7 @@
update := make(map[string]EndpointsUpdate)
for _, r := range resources {
if !IsEndpointsResource(r.GetTypeUrl()) {
- return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
+ return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl())
}
cla := &v3endpointpb.ClusterLoadAssignment{}
diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go
index 3ce1f87..607f26f 100644
--- a/xds/internal/client/transport_helper.go
+++ b/xds/internal/client/transport_helper.go
@@ -51,7 +51,7 @@
// SendRequest constructs and sends out a DiscoveryRequest message specific
// to the underlying transport protocol version.
- SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
+ SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error
// RecvResponse uses the provided stream to receive a response specific to
// the underlying transport protocol version.
@@ -246,10 +246,10 @@
t.sendCh.Load()
var (
- target []string
- rType ResourceType
- version, nonce string
- send bool
+ target []string
+ rType ResourceType
+ version, nonce, errMsg string
+ send bool
)
switch update := u.(type) {
case *watchAction:
@@ -259,6 +259,7 @@
if !send {
continue
}
+ errMsg = update.errMsg
}
if stream == nil {
// There's no stream yet. Skip the request. This request
@@ -267,7 +268,7 @@
// sending response back).
continue
}
- if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
+ if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
@@ -292,7 +293,7 @@
t.nonceMap = make(map[ResourceType]string)
for rType, s := range t.watchMap {
- if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
+ if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
t.logger.Errorf("ADS request failed: %v", err)
return false
}
@@ -321,6 +322,7 @@
rType: rType,
version: "",
nonce: nonce,
+ errMsg: err.Error(),
stream: stream,
})
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
@@ -387,6 +389,7 @@
rType ResourceType
version string // NACK if version is an empty string.
nonce string
+ errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go
index 674bba4..7b063ad 100644
--- a/xds/internal/client/v2/client.go
+++ b/xds/internal/client/v2/client.go
@@ -25,6 +25,7 @@
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
@@ -33,6 +34,7 @@
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+ statuspb "google.golang.org/genproto/googleapis/rpc/status"
)
func init() {
@@ -106,7 +108,7 @@
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
-func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
+func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -117,7 +119,11 @@
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
- // TODO: populate ErrorDetails for nack.
+ }
+ if errMsg != "" {
+ req.ErrorDetail = &statuspb.Status{
+ Code: int32(codes.InvalidArgument), Message: errMsg,
+ }
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
diff --git a/xds/internal/client/v2/client_ack_test.go b/xds/internal/client/v2/client_ack_test.go
index 2b38d3a..87437aa 100644
--- a/xds/internal/client/v2/client_ack_test.go
+++ b/xds/internal/client/v2/client_ack_test.go
@@ -29,6 +29,7 @@
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/testutils"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
@@ -73,7 +74,7 @@
}
// compareXDSRequest reads requests from channel, compare it with want.
-func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error {
+func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ch.Receive(ctx)
@@ -84,11 +85,22 @@
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}
+
+ xdsReq := req.Req.(*xdspb.DiscoveryRequest)
+ if (xdsReq.ErrorDetail != nil) != wantErr {
+ return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr)
+ }
+ // All NACK request.ErrorDetails have hardcoded status code InvalidArguments.
+ if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) {
+ return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument)
+ }
+
+ xdsReq.ErrorDetail = nil // Clear the error details field before comparing.
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = ver
wantClone.ResponseNonce = nonce
- if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
- return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
+ if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) {
+ return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal)))
}
return nil
}
@@ -118,7 +130,7 @@
}
v2c.AddWatch(rType, nameToWatch)
- if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
+ if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", rType, err)
}
t.Logf("FakeServer received %v request...", rType)
@@ -133,7 +145,7 @@
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
t.Logf("Good %v response pushed to fakeServer...", rType)
- if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil {
return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Good %v response acked", rType)
@@ -168,7 +180,7 @@
TypeUrl: typeURL,
}, ver)
t.Logf("Bad %v response pushed to fakeServer...", rType)
- if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil {
return fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Bad %v response nacked", rType)
@@ -274,7 +286,7 @@
// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
@@ -314,7 +326,7 @@
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is the previous acked version.
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
@@ -339,7 +351,7 @@
// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatal(err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -356,12 +368,12 @@
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
- if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with correct resource names and version.
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
@@ -394,7 +406,7 @@
// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -410,7 +422,7 @@
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
- if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
@@ -440,7 +452,7 @@
// Start a new watch. The new watch should have the nonce from the response
// above, and version from the first good response.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
- if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
+ if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go
index 328cd8b..5d8d719 100644
--- a/xds/internal/client/v3/client.go
+++ b/xds/internal/client/v3/client.go
@@ -24,7 +24,9 @@
"fmt"
"github.com/golang/protobuf/proto"
+ statuspb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
@@ -106,7 +108,7 @@
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
-func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
+func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -117,7 +119,11 @@
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
- // TODO: populate ErrorDetails for nack.
+ }
+ if errMsg != "" {
+ req.ErrorDetail = &statuspb.Status{
+ Code: int32(codes.InvalidArgument), Message: errMsg,
+ }
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)