blob: 6cf88f6d394279def92b26299d5aa6c3752d142c [file] [log] [blame]
// +build go1.12
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package csds
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/xds/internal/client"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
xtestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
const (
defaultTestTimeout = 10 * time.Second
)
type xdsClientInterfaceWithWatch interface {
WatchListener(string, func(client.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(client.RouteConfigUpdate, error)) func()
WatchCluster(string, func(client.ClusterUpdate, error)) func()
WatchEndpoints(string, func(client.EndpointsUpdate, error)) func()
}
var cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }),
protocmp.IgnoreFields(&v3adminpb.UpdateFailureState{}, "last_update_attempt", "details"),
protocmp.SortRepeated(func(a, b *v3adminpb.ListenersConfigDump_DynamicListener) bool {
return strings.Compare(a.Name, b.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.RoutesConfigDump_DynamicRouteConfig) bool {
if a.RouteConfig == nil {
return false
}
if b.RouteConfig == nil {
return true
}
var at, bt v3routepb.RouteConfiguration
if err := ptypes.UnmarshalAny(a.RouteConfig, &at); err != nil {
panic("failed to unmarshal RouteConfig" + err.Error())
}
if err := ptypes.UnmarshalAny(b.RouteConfig, &bt); err != nil {
panic("failed to unmarshal RouteConfig" + err.Error())
}
return strings.Compare(at.Name, bt.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.ClustersConfigDump_DynamicCluster) bool {
if a.Cluster == nil {
return false
}
if b.Cluster == nil {
return true
}
var at, bt v3clusterpb.Cluster
if err := ptypes.UnmarshalAny(a.Cluster, &at); err != nil {
panic("failed to unmarshal Cluster" + err.Error())
}
if err := ptypes.UnmarshalAny(b.Cluster, &bt); err != nil {
panic("failed to unmarshal Cluster" + err.Error())
}
return strings.Compare(at.Name, bt.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.EndpointsConfigDump_DynamicEndpointConfig) bool {
if a.EndpointConfig == nil {
return false
}
if b.EndpointConfig == nil {
return true
}
var at, bt v3endpointpb.ClusterLoadAssignment
if err := ptypes.UnmarshalAny(a.EndpointConfig, &at); err != nil {
panic("failed to unmarshal Endpoints" + err.Error())
}
if err := ptypes.UnmarshalAny(b.EndpointConfig, &bt); err != nil {
panic("failed to unmarshal Endpoints" + err.Error())
}
return strings.Compare(at.ClusterName, bt.ClusterName) < 0
}),
protocmp.IgnoreFields(&v3adminpb.ListenersConfigDump_DynamicListenerState{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.RoutesConfigDump_DynamicRouteConfig{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.ClustersConfigDump_DynamicCluster{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{}, "last_updated"),
protocmp.Transform(),
}
var (
ldsTargets = []string{"lds.target.good:0000", "lds.target.good:1111"}
listeners = make([]*v3listenerpb.Listener, len(ldsTargets))
listenerAnys = make([]*anypb.Any, len(ldsTargets))
rdsTargets = []string{"route-config-0", "route-config-1"}
routes = make([]*v3routepb.RouteConfiguration, len(rdsTargets))
routeAnys = make([]*anypb.Any, len(rdsTargets))
cdsTargets = []string{"cluster-0", "cluster-1"}
clusters = make([]*v3clusterpb.Cluster, len(cdsTargets))
clusterAnys = make([]*anypb.Any, len(cdsTargets))
edsTargets = []string{"endpoints-0", "endpoints-1"}
endpoints = make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
endpointAnys = make([]*anypb.Any, len(edsTargets))
ips = []string{"0.0.0.0", "1.1.1.1"}
ports = []uint32{123, 456}
)
func init() {
for i := range ldsTargets {
listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
listenerAnys[i] = testutils.MarshalAny(listeners[i])
}
for i := range rdsTargets {
routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
routeAnys[i] = testutils.MarshalAny(routes[i])
}
for i := range cdsTargets {
clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
clusterAnys[i] = testutils.MarshalAny(clusters[i])
}
for i := range edsTargets {
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i])
endpointAnys[i] = testutils.MarshalAny(endpoints[i])
}
}
func TestCSDS(t *testing.T) {
const retryCount = 10
xdsC, mgmServer, nodeID, stream, cleanup := commonSetup(t)
defer cleanup()
for _, target := range ldsTargets {
xdsC.WatchListener(target, func(client.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(client.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(client.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(client.EndpointsUpdate, error) {})
}
for i := 0; i < retryCount; i++ {
err := checkForRequested(stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
if err := mgmServer.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
}); err != nil {
t.Fatal(err)
}
for i := 0; i < retryCount; i++ {
err := checkForACKed(stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
const nackResourceIdx = 0
if err := mgmServer.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same.
},
Routes: []*v3routepb.RouteConfiguration{
{Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{
Routes: []*v3routepb.Route{{}},
}}},
},
Clusters: []*v3clusterpb.Cluster{
{Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}},
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
{ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}},
},
SkipValidation: true,
}); err != nil {
t.Fatal(err)
}
for i := 0; i < retryCount; i++ {
err := checkForNACKed(nackResourceIdx, stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
}
func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
fs, err := e2e.StartManagementServer()
if err != nil {
t.Fatal(err)
}
// Create a bootstrap file in a temporary directory.
bootstrapCleanup, err := xds.SetupBootstrapFile(xds.BootstrapOptions{
Version: xds.TransportV3,
NodeID: nodeID,
ServerURI: fs.Address,
})
if err != nil {
t.Fatal(err)
}
// Create xds_client.
xdsC, err := client.New()
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
return xdsC, nil
}
// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
// Create a local listener and pass it to Serve().
lis, err := xtestutils.LocalTCPListener()
if err != nil {
t.Fatalf("xtestutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
// Create CSDS client.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("cannot connect to server: %v", err)
}
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}
return xdsC, fs, nodeID, stream, func() {
fs.Stop()
cancel()
conn.Close()
server.Stop()
csdss.Close()
newXDSClient = oldNewXDSClient
xdsC.Close()
bootstrapCleanup()
}
}
func checkForRequested(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send request: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
wantLis = append(wantLis, &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.ListenersConfigDump{
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for range rdsTargets {
wantRoutes = append(wantRoutes, &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for range cdsTargets {
wantCluster = append(wantCluster, &v3adminpb.ClustersConfigDump_DynamicCluster{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.ClustersConfigDump{
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for range cdsTargets {
wantEndpoint = append(wantEndpoint, &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func checkForACKed(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
const wantVersion = "1"
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
wantLis = append(wantLis, &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: wantVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ErrorState: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.ListenersConfigDump{
VersionInfo: wantVersion,
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
wantRoutes = append(wantRoutes, &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: wantVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
wantCluster = append(wantCluster, &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: wantVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.ClustersConfigDump{
VersionInfo: wantVersion,
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
wantEndpoint = append(wantEndpoint, &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: wantVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
const (
ackVersion = "1"
nackVersion = "2"
)
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: ackVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantLis = append(wantLis, configDump)
}
wantDump := &v3adminpb.ListenersConfigDump{
VersionInfo: nackVersion,
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: ackVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantRoutes = append(wantRoutes, configDump)
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: ackVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantCluster = append(wantCluster, configDump)
}
wantDump := &v3adminpb.ClustersConfigDump{
VersionInfo: nackVersion,
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: ackVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantEndpoint = append(wantEndpoint, configDump)
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func protoToJSON(p proto.Message) string {
mm := jsonpb.Marshaler{
Indent: " ",
}
ret, _ := mm.MarshalToString(p)
return ret
}
func Test_nodeProtoToV3(t *testing.T) {
const (
testID = "test-id"
testCluster = "test-cluster"
testZone = "test-zone"
)
tests := []struct {
name string
n proto.Message
want *v3corepb.Node
}{
{
name: "v3",
n: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
want: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
},
{
name: "v2",
n: &v2corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v2corepb.Locality{Zone: testZone},
},
want: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
},
{
name: "not node",
n: &v2corepb.Locality{Zone: testZone},
want: nil, // Input is not a node, should return nil.
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := nodeProtoToV3(tt.n)
if diff := cmp.Diff(got, tt.want, protocmp.Transform()); diff != "" {
t.Errorf("nodeProtoToV3() got unexpected result, diff (-got, +want): %v", diff)
}
})
}
}