xds: Refactor/cleanup xds client tests. (#3920)
diff --git a/xds/internal/client/client_cds_test.go b/xds/internal/client/client_cds_test.go
index d45baa0..578d612 100644
--- a/xds/internal/client/client_cds_test.go
+++ b/xds/internal/client/client_cds_test.go
@@ -32,15 +32,14 @@
"google.golang.org/grpc/xds/internal/version"
)
-func (s) TestValidateCluster(t *testing.T) {
- const (
- clusterName = "clusterName"
- serviceName = "service"
- )
- var (
- emptyUpdate = ClusterUpdate{ServiceName: "", EnableLRS: false}
- )
+const (
+ clusterName = "clusterName"
+ serviceName = "service"
+)
+var emptyUpdate = ClusterUpdate{ServiceName: "", EnableLRS: false}
+
+func (s) TestValidateCluster_Failure(t *testing.T) {
tests := []struct {
name string
cluster *v3clusterpb.Cluster
@@ -98,6 +97,23 @@
wantUpdate: emptyUpdate,
wantErr: true,
},
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ if update, err := validateCluster(test.cluster); err == nil {
+ t.Errorf("validateCluster(%+v) = %v, wanted error", test.cluster, update)
+ }
+ })
+ }
+}
+
+func (s) TestValidateCluster_Success(t *testing.T) {
+ tests := []struct {
+ name string
+ cluster *v3clusterpb.Cluster
+ wantUpdate ClusterUpdate
+ }{
{
name: "happy-case-no-service-name-no-lrs",
cluster: &v3clusterpb.Cluster{
@@ -156,8 +172,11 @@
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := validateCluster(test.cluster)
- if ((err != nil) != test.wantErr) || !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) {
- t.Errorf("validateCluster(%+v) = (%v, %v), wantErr: (%v, %v)", test.cluster, update, err, test.wantUpdate, test.wantErr)
+ if err != nil {
+ t.Errorf("validateCluster(%+v) failed: %v", test.cluster, err)
+ }
+ if !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) {
+ t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, update, test.wantUpdate)
}
})
}
diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go
index b36a5af..4a53138 100644
--- a/xds/internal/client/client_test.go
+++ b/xds/internal/client/client_test.go
@@ -20,9 +20,13 @@
import (
"context"
+ "fmt"
"testing"
"time"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+
"google.golang.org/grpc"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
@@ -49,7 +53,8 @@
testEDSName = "test-eds"
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
- defaultTestTimeout = 1 * time.Second
+ defaultTestTimeout = 5 * time.Second
+ defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
@@ -68,24 +73,22 @@
}
type testAPIClient struct {
- r UpdateHandler
-
addWatches map[ResourceType]*testutils.Channel
removeWatches map[ResourceType]*testutils.Channel
}
-func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
+func overrideNewAPIClient() (*testutils.Channel, func()) {
origNewAPIClient := newAPIClient
- ch := make(chan *testAPIClient, 1)
+ ch := testutils.NewChannel()
newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
- ret := newTestAPIClient(opts.Parent)
- ch <- ret
+ ret := newTestAPIClient()
+ ch.Send(ret)
return ret, nil
}
return ch, func() { newAPIClient = origNewAPIClient }
}
-func newTestAPIClient(r UpdateHandler) *testAPIClient {
+func newTestAPIClient() *testAPIClient {
addWatches := map[ResourceType]*testutils.Channel{
ListenerResource: testutils.NewChannel(),
RouteConfigResource: testutils.NewChannel(),
@@ -99,7 +102,6 @@
EndpointsResource: testutils.NewChannel(),
}
return &testAPIClient{
- r: r,
addWatches: addWatches,
removeWatches: removeWatches,
}
@@ -121,53 +123,108 @@
// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
clusterUpdateCh := testutils.NewChannel()
firstTime := true
- c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+ client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
// Calls another watch inline, to ensure there's deadlock.
- c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
+ client.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); firstTime && err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
-
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate,
- })
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate2,
- })
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate2})
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
+
+func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate) error {
+ u, err := updateCh.Receive(ctx)
+ if err != nil {
+ return fmt.Errorf("timeout when waiting for listener update: %v", err)
+ }
+ gotUpdate := u.(ldsUpdateErr)
+ if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+ return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+ }
+ return nil
+}
+
+func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate) error {
+ u, err := updateCh.Receive(ctx)
+ if err != nil {
+ return fmt.Errorf("timeout when waiting for route configuration update: %v", err)
+ }
+ gotUpdate := u.(rdsUpdateErr)
+ if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+ return fmt.Errorf("unexpected route config update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+ }
+ return nil
+}
+
+func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ServiceUpdate) error {
+ u, err := updateCh.Receive(ctx)
+ if err != nil {
+ return fmt.Errorf("timeout when waiting for service update: %v", err)
+ }
+ gotUpdate := u.(serviceUpdateErr)
+ if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
+ return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+ }
+ return nil
+}
+
+func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate) error {
+ u, err := updateCh.Receive(ctx)
+ if err != nil {
+ return fmt.Errorf("timeout when waiting for cluster update: %v", err)
+ }
+ gotUpdate := u.(clusterUpdateErr)
+ if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+ return fmt.Errorf("unexpected clusterUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+ }
+ return nil
+}
+
+func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate) error {
+ u, err := updateCh.Receive(ctx)
+ if err != nil {
+ return fmt.Errorf("timeout when waiting for endpoints update: %v", err)
+ }
+ gotUpdate := u.(endpointsUpdateErr)
+ if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
+ return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+ }
+ return nil
+}
diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go
index 3c816f9..c512675 100644
--- a/xds/internal/client/client_watchers_cluster_test.go
+++ b/xds/internal/client/client_watchers_cluster_test.go
@@ -22,6 +22,8 @@
"context"
"testing"
+ "github.com/google/go-cmp/cmp"
+
"google.golang.org/grpc/internal/testutils"
)
@@ -35,63 +37,52 @@
// - an update for another resource name
// - an update is received after cancel()
func (s) TestClusterWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- // TODO: add a timeout to this recv.
- // Note that this won't be necessary if we finish the TODO below to call
- // Client directly instead of v2Client.r.
- v2Client := <-v2ClientCh
-
- clusterUpdateCh := testutils.NewChannel()
- cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
- clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ clusterUpdateCh := testutils.NewChannel()
+ cancelWatch := client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+ clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
- // This is calling v2Client.r to send the update, but r is set to Client, so
- // this is same as calling Client to update. The one thing this covers is
- // that `NewXDSV2Client` is called with the right parent.
- //
- // TODO: in a future cleanup, this (and the same thing in other tests) can
- // be changed call Client directly.
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate,
- })
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another update, with an extra resource for a different resource name.
- v2Client.r.NewClusters(map[string]ClusterUpdate{
+ client.NewClusters(map[string]ClusterUpdate{
testCDSName: wantUpdate,
"randomName": {},
})
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %+v, %v, want channel recv timeout", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Cancel watch, and send update again.
cancelWatch()
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate,
- })
-
- if u, err := clusterUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -99,64 +90,62 @@
// TestClusterTwoWatchSameResourceName covers the case where an update is received
// after two watch() for the same resource name.
func (s) TestClusterTwoWatchSameResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- var clusterUpdateChs []*testutils.Channel
- var cancelLastWatch func()
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+ var clusterUpdateChs []*testutils.Channel
+ var cancelLastWatch func()
const count = 2
for i := 0; i < count; i++ {
clusterUpdateCh := testutils.NewChannel()
clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
- cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+ cancelLastWatch = client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
}
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate,
- })
-
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
for i := 0; i < count; i++ {
- if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
// Cancel the last watch, and send update again.
cancelLastWatch()
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName: wantUpdate,
- })
-
+ client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
for i := 0; i < count-1; i++ {
- if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
- if u, err := clusterUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := clusterUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -164,34 +153,37 @@
// TestClusterThreeWatchDifferentResourceName covers the case where an update is
// received after three watch() for different resource names.
func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- var clusterUpdateChs []*testutils.Channel
- const count = 2
-
- // Two watches for the same name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ // Two watches for the same name.
+ var clusterUpdateChs []*testutils.Channel
+ const count = 2
for i := 0; i < count; i++ {
clusterUpdateCh := testutils.NewChannel()
clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
- c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
+ client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@@ -199,83 +191,86 @@
// Third watch for a different name.
clusterUpdateCh2 := testutils.NewChannel()
- c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
+ client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
+ client.NewClusters(map[string]ClusterUpdate{
testCDSName + "1": wantUpdate1,
testCDSName + "2": wantUpdate2,
})
for i := 0; i < count; i++ {
- if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) {
- t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1); err != nil {
+ t.Fatal(err)
}
}
-
- if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
// TestClusterWatchAfterCache covers the case where watch is called after the update
// is in cache.
func (s) TestClusterWatchAfterCache(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- clusterUpdateCh := testutils.NewChannel()
- c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
- clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ clusterUpdateCh := testutils.NewChannel()
+ client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+ clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
+ client.NewClusters(map[string]ClusterUpdate{
testCDSName: wantUpdate,
})
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another watch for the resource in cache.
clusterUpdateCh2 := testutils.NewChannel()
- c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+ client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
- if n, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if n, err := apiClient.addWatches[ClusterResource].Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
// New watch should receives the update.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Old watch should see nothing.
- if u, err := clusterUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -284,38 +279,38 @@
// an CDS response for the request that it sends out. We want the watch callback
// to be invoked with an error once the watchExpiryTimer fires.
func (s) TestClusterWatchExpiryTimer(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- clusterUpdateCh := testutils.NewChannel()
- c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
- clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ clusterUpdateCh := testutils.NewChannel()
+ client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
+ clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
+ })
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
u, err := clusterUpdateCh.Receive(ctx)
if err != nil {
- t.Fatalf("failed to get clusterUpdate: %v", err)
+ t.Fatalf("timeout when waiting for cluster update: %v", err)
}
- uu := u.(clusterUpdateErr)
- if uu.u != (ClusterUpdate{}) {
- t.Errorf("unexpected clusterUpdate: %v, want %v", uu.u, ClusterUpdate{})
- }
- if uu.err == nil {
- t.Errorf("unexpected clusterError: <nil>, want error watcher timeout")
+ gotUpdate := u.(clusterUpdateErr)
+ if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, ClusterUpdate{}) {
+ t.Fatalf("unexpected clusterUpdate: (%v, %v), want: (ClusterUpdate{}, nil)", gotUpdate.u, gotUpdate.err)
}
}
@@ -323,41 +318,44 @@
// an CDS response for the request that it sends out. We want no error even
// after expiry timeout.
func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- clusterUpdateCh := testutils.NewChannel()
- c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
- clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ clusterUpdateCh := testutils.NewChannel()
+ client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
+ clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
+ })
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
+ client.NewClusters(map[string]ClusterUpdate{
testCDSName: wantUpdate,
})
-
- if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Wait for an error, the error should never happen.
- u, err := clusterUpdateCh.Receive(ctx)
- if err != context.DeadlineExceeded {
- t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err)
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestWatchExpiryTimeout)
+ defer sCancel()
+ if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
+ t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -368,80 +366,78 @@
// - one more update without the removed resource
// - the callback (above) shouldn't receive any update
func (s) TestClusterResourceRemoved(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- clusterUpdateCh1 := testutils.NewChannel()
- c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
- clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ clusterUpdateCh1 := testutils.NewChannel()
+ client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
+ clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
+
// Another watch for a different name.
clusterUpdateCh2 := testutils.NewChannel()
- c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
+ client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
- v2Client.r.NewClusters(map[string]ClusterUpdate{
+ client.NewClusters(map[string]ClusterUpdate{
testCDSName + "1": wantUpdate1,
testCDSName + "2": wantUpdate2,
})
-
- if u, err := clusterUpdateCh1.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1); err != nil {
+ t.Fatal(err)
}
-
- if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
// Send another update to remove resource 1.
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName + "2": wantUpdate2,
- })
+ client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2})
- // watcher 1 should get an error.
+ // Watcher 1 should get an error.
if u, err := clusterUpdateCh1.Receive(ctx); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}
- // watcher 2 should get the same update again.
- if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ // Watcher 2 should get the same update again.
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
// Send one more update without resource 1.
- v2Client.r.NewClusters(map[string]ClusterUpdate{
- testCDSName + "2": wantUpdate2,
- })
+ client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2})
- // watcher 1 should get an error.
- if u, err := clusterUpdateCh1.Receive(ctx); err != context.DeadlineExceeded {
- t.Errorf("unexpected clusterUpdate: %v, want receiving from channel timeout", u)
+ // Watcher 1 should not see an update.
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := clusterUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
+ t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
}
- // watcher 2 should get the same update again.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+ // Watcher 2 should get the same update again.
+ if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
diff --git a/xds/internal/client/client_watchers_endpoints_test.go b/xds/internal/client/client_watchers_endpoints_test.go
index c414e3a..822ee59 100644
--- a/xds/internal/client/client_watchers_endpoints_test.go
+++ b/xds/internal/client/client_watchers_endpoints_test.go
@@ -23,7 +23,6 @@
"testing"
"github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal"
@@ -44,7 +43,6 @@
Weight: 1,
},
}
- endpointsCmpOpts = []cmp.Option{cmp.AllowUnexported(endpointsUpdateErr{}), cmpopts.EquateEmpty()}
)
type endpointsUpdateErr struct {
@@ -57,55 +55,51 @@
// - an update for another resource name (which doesn't trigger callback)
// - an update is received after cancel()
func (s) TestEndpointsWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- endpointsUpdateCh := testutils.NewChannel()
- cancelWatch := c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
- endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ endpointsUpdateCh := testutils.NewChannel()
+ cancelWatch := client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+ endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- testCDSName: wantUpdate,
- })
-
- if u, err := endpointsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
- t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another update for a different resource name.
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- "randomName": {},
- })
-
- if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewEndpoints(map[string]EndpointsUpdate{"randomName": {}})
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
}
// Cancel watch, and send update again.
cancelWatch()
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- testCDSName: wantUpdate,
- })
-
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -113,64 +107,64 @@
// TestEndpointsTwoWatchSameResourceName covers the case where an update is received
// after two watch() for the same resource name.
func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- var endpointsUpdateChs []*testutils.Channel
- const count = 2
-
- var cancelLastWatch func()
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ const count = 2
+ var (
+ endpointsUpdateChs []*testutils.Channel
+ cancelLastWatch func()
+ )
for i := 0; i < count; i++ {
endpointsUpdateCh := testutils.NewChannel()
endpointsUpdateChs = append(endpointsUpdateChs, endpointsUpdateCh)
- cancelLastWatch = c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+ cancelLastWatch = client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
}
wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- testCDSName: wantUpdate,
- })
-
+ client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
for i := 0; i < count; i++ {
- if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
- t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
// Cancel the last watch, and send update again.
cancelLastWatch()
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- testCDSName: wantUpdate,
- })
-
+ client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
for i := 0; i < count-1; i++ {
- if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
- t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
- if u, err := endpointsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := endpointsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -178,34 +172,37 @@
// TestEndpointsThreeWatchDifferentResourceName covers the case where an update is
// received after three watch() for different resource names.
func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- var endpointsUpdateChs []*testutils.Channel
- const count = 2
-
- // Two watches for the same name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ // Two watches for the same name.
+ var endpointsUpdateChs []*testutils.Channel
+ const count = 2
for i := 0; i < count; i++ {
endpointsUpdateCh := testutils.NewChannel()
endpointsUpdateChs = append(endpointsUpdateChs, endpointsUpdateCh)
- c.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) {
+ client.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) {
endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@@ -213,83 +210,84 @@
// Third watch for a different name.
endpointsUpdateCh2 := testutils.NewChannel()
- c.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) {
+ client.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) {
endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
wantUpdate2 := EndpointsUpdate{Localities: []Locality{testLocalities[1]}}
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
+ client.NewEndpoints(map[string]EndpointsUpdate{
testCDSName + "1": wantUpdate1,
testCDSName + "2": wantUpdate2,
})
for i := 0; i < count; i++ {
- if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate1, nil}, endpointsCmpOpts...) {
- t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate1); err != nil {
+ t.Fatal(err)
}
}
-
- if u, err := endpointsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate2, nil}, endpointsCmpOpts...) {
- t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
// TestEndpointsWatchAfterCache covers the case where watch is called after the update
// is in cache.
func (s) TestEndpointsWatchAfterCache(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- endpointsUpdateCh := testutils.NewChannel()
- c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
- endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ endpointsUpdateCh := testutils.NewChannel()
+ client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+ endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
- v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
- testCDSName: wantUpdate,
- })
-
- if u, err := endpointsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
- t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another watch for the resource in cache.
endpointsUpdateCh2 := testutils.NewChannel()
- c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+ client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
})
- if n, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if n, err := apiClient.addWatches[EndpointsResource].Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
// New watch should receives the update.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := endpointsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
- t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Old watch should see nothing.
- if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -298,36 +296,37 @@
// an CDS response for the request that it sends out. We want the watch callback
// to be invoked with an error once the watchExpiryTimer fires.
func (s) TestEndpointsWatchExpiryTimer(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- endpointsUpdateCh := testutils.NewChannel()
- c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
- endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
- })
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ endpointsUpdateCh := testutils.NewChannel()
+ client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+ endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
u, err := endpointsUpdateCh.Receive(ctx)
if err != nil {
- t.Fatalf("failed to get endpointsUpdate: %v", err)
+ t.Fatalf("timeout when waiting for endpoints update: %v", err)
}
- uu := u.(endpointsUpdateErr)
- if !cmp.Equal(uu.u, EndpointsUpdate{}, endpointsCmpOpts...) {
- t.Errorf("unexpected endpointsUpdate: %v, want %v", uu.u, EndpointsUpdate{})
- }
- if uu.err == nil {
- t.Errorf("unexpected endpointsError: <nil>, want error watcher timeout")
+ gotUpdate := u.(endpointsUpdateErr)
+ if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, EndpointsUpdate{}) {
+ t.Fatalf("unexpected endpointsUpdate: (%v, %v), want: (EndpointsUpdate{}, nil)", gotUpdate.u, gotUpdate.err)
}
}
diff --git a/xds/internal/client/client_watchers_lds_test.go b/xds/internal/client/client_watchers_lds_test.go
index b3a25be..e922654 100644
--- a/xds/internal/client/client_watchers_lds_test.go
+++ b/xds/internal/client/client_watchers_lds_test.go
@@ -35,54 +35,52 @@
// - an update for another resource name
// - an update is received after cancel()
func (s) TestLDSWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- ldsUpdateCh := testutils.NewChannel()
- cancelWatch := c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
- ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ ldsUpdateCh := testutils.NewChannel()
+ cancelWatch := client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+ ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: wantUpdate,
- })
-
- if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another update, with an extra resource for a different resource name.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
+ client.NewListeners(map[string]ListenerUpdate{
testLDSName: wantUpdate,
"randomName": {},
})
-
- if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Cancel watch, and send update again.
cancelWatch()
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: wantUpdate,
- })
-
- if u, err := ldsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -90,64 +88,65 @@
// TestLDSTwoWatchSameResourceName covers the case where an update is received
// after two watch() for the same resource name.
func (s) TestLDSTwoWatchSameResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- var ldsUpdateChs []*testutils.Channel
- const count = 2
-
- var cancelLastWatch func()
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ const count = 2
+ var (
+ ldsUpdateChs []*testutils.Channel
+ cancelLastWatch func()
+ )
+
for i := 0; i < count; i++ {
ldsUpdateCh := testutils.NewChannel()
ldsUpdateChs = append(ldsUpdateChs, ldsUpdateCh)
- cancelLastWatch = c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+ cancelLastWatch = client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
}
wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: wantUpdate,
- })
-
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
for i := 0; i < count; i++ {
- if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
// Cancel the last watch, and send update again.
cancelLastWatch()
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: wantUpdate,
- })
-
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
for i := 0; i < count-1; i++ {
- if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
- if u, err := ldsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := ldsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -155,34 +154,38 @@
// TestLDSThreeWatchDifferentResourceName covers the case where an update is
// received after three watch() for different resource names.
func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
var ldsUpdateChs []*testutils.Channel
const count = 2
// Two watches for the same name.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
for i := 0; i < count; i++ {
ldsUpdateCh := testutils.NewChannel()
ldsUpdateChs = append(ldsUpdateChs, ldsUpdateCh)
- c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
+ client.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@@ -190,83 +193,84 @@
// Third watch for a different name.
ldsUpdateCh2 := testutils.NewChannel()
- c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
+ client.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := ListenerUpdate{RouteConfigName: testRDSName + "1"}
wantUpdate2 := ListenerUpdate{RouteConfigName: testRDSName + "2"}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
+ client.NewListeners(map[string]ListenerUpdate{
testLDSName + "1": wantUpdate1,
testLDSName + "2": wantUpdate2,
})
for i := 0; i < count; i++ {
- if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) {
- t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate1); err != nil {
+ t.Fatal(err)
}
}
-
- if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
// TestLDSWatchAfterCache covers the case where watch is called after the update
// is in cache.
func (s) TestLDSWatchAfterCache(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- ldsUpdateCh := testutils.NewChannel()
- c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
- ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ ldsUpdateCh := testutils.NewChannel()
+ client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+ ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: wantUpdate,
- })
-
- if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another watch for the resource in cache.
ldsUpdateCh2 := testutils.NewChannel()
- c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+ client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
- if n, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if n, err := apiClient.addWatches[ListenerResource].Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
- // New watch should receives the update.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ // New watch should receive the update.
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Old watch should see nothing.
- if u, err := ldsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -278,80 +282,77 @@
// - one more update without the removed resource
// - the callback (above) shouldn't receive any update
func (s) TestLDSResourceRemoved(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- ldsUpdateCh1 := testutils.NewChannel()
- c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
- ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ ldsUpdateCh1 := testutils.NewChannel()
+ client.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
+ ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
// Another watch for a different name.
ldsUpdateCh2 := testutils.NewChannel()
- c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
+ client.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := ListenerUpdate{RouteConfigName: testEDSName + "1"}
wantUpdate2 := ListenerUpdate{RouteConfigName: testEDSName + "2"}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
+ client.NewListeners(map[string]ListenerUpdate{
testLDSName + "1": wantUpdate1,
testLDSName + "2": wantUpdate2,
})
-
- if u, err := ldsUpdateCh1.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh1, wantUpdate1); err != nil {
+ t.Fatal(err)
}
-
- if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
// Send another update to remove resource 1.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName + "2": wantUpdate2,
- })
+ client.NewListeners(map[string]ListenerUpdate{testLDSName + "2": wantUpdate2})
- // watcher 1 should get an error.
+ // Watcher 1 should get an error.
if u, err := ldsUpdateCh1.Receive(ctx); err != nil || ErrType(u.(ldsUpdateErr).err) != ErrorTypeResourceNotFound {
t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}
- // watcher 2 should get the same update again.
- if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ // Watcher 2 should get the same update again.
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
// Send one more update without resource 1.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName + "2": wantUpdate2,
- })
+ client.NewListeners(map[string]ListenerUpdate{testLDSName + "2": wantUpdate2})
- // watcher 1 should get an error.
- if u, err := ldsUpdateCh1.Receive(ctx); err != context.DeadlineExceeded {
+ // Watcher 1 should not see an update.
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := ldsUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected ListenerUpdate: %v, want receiving from channel timeout", u)
}
- // watcher 2 should get the same update again.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
- t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+ // Watcher 2 should get the same update again.
+ if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
diff --git a/xds/internal/client/client_watchers_rds_test.go b/xds/internal/client/client_watchers_rds_test.go
index abaec80..e1f23f3 100644
--- a/xds/internal/client/client_watchers_rds_test.go
+++ b/xds/internal/client/client_watchers_rds_test.go
@@ -23,6 +23,7 @@
"testing"
"github.com/google/go-cmp/cmp"
+
"google.golang.org/grpc/internal/testutils"
)
@@ -36,54 +37,51 @@
// - an update for another resource name (which doesn't trigger callback)
// - an update is received after cancel()
func (s) TestRDSWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- rdsUpdateCh := testutils.NewChannel()
- cancelWatch := c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
- rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
- })
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ rdsUpdateCh := testutils.NewChannel()
+ cancelWatch := client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+ rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- testRDSName: wantUpdate,
- })
-
- if u, err := rdsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another update for a different resource name.
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- "randomName": {},
- })
-
- if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{"randomName": {}})
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
}
// Cancel watch, and send update again.
cancelWatch()
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- testRDSName: wantUpdate,
- })
-
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -91,64 +89,64 @@
// TestRDSTwoWatchSameResourceName covers the case where an update is received
// after two watch() for the same resource name.
func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- var rdsUpdateChs []*testutils.Channel
- const count = 2
-
- var cancelLastWatch func()
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ const count = 2
+ var (
+ rdsUpdateChs []*testutils.Channel
+ cancelLastWatch func()
+ )
for i := 0; i < count; i++ {
rdsUpdateCh := testutils.NewChannel()
rdsUpdateChs = append(rdsUpdateChs, rdsUpdateCh)
- cancelLastWatch = c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+ cancelLastWatch = client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- testRDSName: wantUpdate,
- })
-
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
for i := 0; i < count; i++ {
- if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
// Cancel the last watch, and send update again.
cancelLastWatch()
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- testRDSName: wantUpdate,
- })
-
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
for i := 0; i < count-1; i++ {
- if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil {
+ t.Fatal(err)
}
}
- if u, err := rdsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := rdsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -156,34 +154,37 @@
// TestRDSThreeWatchDifferentResourceName covers the case where an update is
// received after three watch() for different resource names.
func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- var rdsUpdateChs []*testutils.Channel
- const count = 2
-
- // Two watches for the same name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ // Two watches for the same name.
+ var rdsUpdateChs []*testutils.Channel
+ const count = 2
for i := 0; i < count; i++ {
rdsUpdateCh := testutils.NewChannel()
rdsUpdateChs = append(rdsUpdateChs, rdsUpdateCh)
- c.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) {
+ client.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) {
rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
})
if i == 0 {
// A new watch is registered on the underlying API client only for
// the first iteration because we are using the same resource name.
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
}
@@ -191,82 +192,84 @@
// Third watch for a different name.
rdsUpdateCh2 := testutils.NewChannel()
- c.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) {
+ client.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) {
rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}}
wantUpdate2 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName + "1": wantUpdate1,
testRDSName + "2": wantUpdate2,
})
for i := 0; i < count; i++ {
- if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate1, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate1); err != nil {
+ t.Fatal(err)
}
}
-
- if u, err := rdsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate2, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh2, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
// TestRDSWatchAfterCache covers the case where watch is called after the update
// is in cache.
func (s) TestRDSWatchAfterCache(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
+ defer client.Close()
- v2Client := <-v2ClientCh
-
- rdsUpdateCh := testutils.NewChannel()
- c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
- rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
- })
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ rdsUpdateCh := testutils.NewChannel()
+ client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+ rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
- testRDSName: wantUpdate,
- })
-
- if u, err := rdsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
- t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+ if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another watch for the resource in cache.
rdsUpdateCh2 := testutils.NewChannel()
- c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+ client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
})
- if n, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if n, err := apiClient.addWatches[RouteConfigResource].Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
}
// New watch should receives the update.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
if u, err := rdsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
}
// Old watch should see nothing.
- if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
}
}
diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go
index 5a6a401..d4f69d5 100644
--- a/xds/internal/client/client_watchers_service_test.go
+++ b/xds/internal/client/client_watchers_service_test.go
@@ -23,7 +23,6 @@
"testing"
"github.com/google/go-cmp/cmp"
- "github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/testutils"
)
@@ -33,47 +32,45 @@
err error
}
-var serviceCmpOpts = []cmp.Option{cmp.AllowUnexported(serviceUpdateErr{}), cmpopts.EquateEmpty()}
-
// TestServiceWatch covers the cases:
// - an update is received after a watch()
// - an update with routes received
func (s) TestServiceWatch(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
-
- wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+ wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
wantUpdate2 := ServiceUpdate{
@@ -82,7 +79,7 @@
Action: map[string]uint32{testCDSName: 1},
}},
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {
Routes: []*Route{{
Prefix: newStringP(""),
@@ -90,8 +87,8 @@
}},
},
})
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
@@ -99,70 +96,66 @@
// response, the second LDS response trigger an new RDS watch, and an update of
// the old RDS watch doesn't trigger update to service callback.
func (s) TestServiceWatchLDSUpdate(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
-
- wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+ wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another LDS update with a different RDS_name.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName + "2"},
- })
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName + "2"}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
// Another update for the old name.
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
}
- wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
// RDS update for the new name.
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName + "2": {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}},
})
-
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
+ t.Fatal(err)
}
}
@@ -170,49 +163,48 @@
// error (because only one is allowed). But the first watch still receives
// updates.
func (s) TestServiceWatchSecond(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
-
- wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+ wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
- serviceUpdateCh2 := testutils.NewChannel()
// Call WatchService() again, with the same or different name.
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh2 := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
serviceUpdateCh2.Send(serviceUpdateErr{u: update, err: err})
})
-
u, err := serviceUpdateCh2.Receive(ctx)
if err != nil {
t.Fatalf("failed to get serviceUpdate: %v", err)
@@ -227,18 +219,17 @@
// Send update again, first callback should be called, second should
// timeout.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
- })
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
- if u, err := serviceUpdateCh2.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := serviceUpdateCh2.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -247,25 +238,28 @@
// does not respond to the requests being sent out as part of registering a
// service update watcher. The callback will get an error.
func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
+ }
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
+ })
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
u, err := serviceUpdateCh.Receive(ctx)
@@ -281,37 +275,39 @@
}
}
-// TestServiceWatchEmptyRDS tests the case where the underlying v2Client
+// TestServiceWatchEmptyRDS tests the case where the underlying apiClient
// receives an empty RDS response. The callback will get an error.
func (s) TestServiceWatchEmptyRDS(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{})
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{})
u, err := serviceUpdateCh.Receive(ctx)
if err != nil {
t.Fatalf("failed to get serviceUpdate: %v", err)
@@ -329,36 +325,40 @@
// received after the client is closed, and we make sure that the registered
// watcher callback is not invoked.
func (s) TestServiceWatchWithClientClose(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, true))
+ client, err := New(clientOpts(testXDSServer, true))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+
// Client is closed before it receives the RDS response.
- c.Close()
- if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ client.Close()
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
}
}
@@ -367,49 +367,49 @@
// update contains the same RDS name as the previous, the RDS watch isn't
// canceled and restarted.
func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
-
- wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Another LDS update with a the same RDS_name.
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
- })
- if v, err := v2Client.removeWatches[RouteConfigResource].Receive(ctx); err == nil {
- t.Fatalf("unexpected rds watch cancel: %v", v)
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if _, err := apiClient.removeWatches[RouteConfigResource].Receive(sCtx); err != context.DeadlineExceeded {
+ t.Fatalf("unexpected rds watch cancel")
}
}
@@ -420,47 +420,46 @@
// - one more update without the removed resource
// - the callback (above) shouldn't receive any update
func (s) TestServiceResourceRemoved(t *testing.T) {
- v2ClientCh, cleanup := overrideNewAPIClient()
+ apiClientCh, cleanup := overrideNewAPIClient()
defer cleanup()
- c, err := New(clientOpts(testXDSServer, false))
+ client, err := New(clientOpts(testXDSServer, false))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
- defer c.Close()
-
- v2Client := <-v2ClientCh
-
- serviceUpdateCh := testutils.NewChannel()
- c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
- serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
- })
-
- wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
- if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
- t.Fatalf("want new watch to start, got error %v", err)
+ c, err := apiClientCh.Receive(ctx)
+ if err != nil {
+ t.Fatalf("timeout when waiting for API client to be created: %v", err)
}
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
+ apiClient := c.(*testAPIClient)
+
+ serviceUpdateCh := testutils.NewChannel()
+ client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+ serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
})
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ t.Fatalf("want new watch to start, got error %v", err)
+ }
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
})
-
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
// Remove LDS resource, should cancel the RDS watch, and trigger resource
// removed error.
- v2Client.r.NewListeners(map[string]ListenerUpdate{})
- if _, err := v2Client.removeWatches[RouteConfigResource].Receive(ctx); err != nil {
+ client.NewListeners(map[string]ListenerUpdate{})
+ if _, err := apiClient.removeWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want watch to be canceled, got error %v", err)
}
if u, err := serviceUpdateCh.Receive(ctx); err != nil || ErrType(u.(serviceUpdateErr).err) != ErrorTypeResourceNotFound {
@@ -469,34 +468,33 @@
// Send RDS update for the removed LDS resource, expect no updates to
// callback, because RDS should be canceled.
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}},
})
- if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
}
// Add LDS resource, but not RDS resource, should
// - start a new RDS watch
// - timeout on service channel, because RDS cache was cleared
- v2Client.r.NewListeners(map[string]ListenerUpdate{
- testLDSName: {RouteConfigName: testRDSName},
- })
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+ client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+ if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
- if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+ sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+ defer sCancel()
+ if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
}
- v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+ client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}},
})
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, nil}, serviceCmpOpts...) {
- t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+ wantUpdate = ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}
+ if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+ t.Fatal(err)
}
}