xds: Refactor/cleanup xds client tests. (#3920)

diff --git a/xds/internal/client/client_cds_test.go b/xds/internal/client/client_cds_test.go
index d45baa0..578d612 100644
--- a/xds/internal/client/client_cds_test.go
+++ b/xds/internal/client/client_cds_test.go
@@ -32,15 +32,14 @@
 	"google.golang.org/grpc/xds/internal/version"
 )
 
-func (s) TestValidateCluster(t *testing.T) {
-	const (
-		clusterName = "clusterName"
-		serviceName = "service"
-	)
-	var (
-		emptyUpdate = ClusterUpdate{ServiceName: "", EnableLRS: false}
-	)
+const (
+	clusterName = "clusterName"
+	serviceName = "service"
+)
 
+var emptyUpdate = ClusterUpdate{ServiceName: "", EnableLRS: false}
+
+func (s) TestValidateCluster_Failure(t *testing.T) {
 	tests := []struct {
 		name       string
 		cluster    *v3clusterpb.Cluster
@@ -98,6 +97,23 @@
 			wantUpdate: emptyUpdate,
 			wantErr:    true,
 		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			if update, err := validateCluster(test.cluster); err == nil {
+				t.Errorf("validateCluster(%+v) = %v, wanted error", test.cluster, update)
+			}
+		})
+	}
+}
+
+func (s) TestValidateCluster_Success(t *testing.T) {
+	tests := []struct {
+		name       string
+		cluster    *v3clusterpb.Cluster
+		wantUpdate ClusterUpdate
+	}{
 		{
 			name: "happy-case-no-service-name-no-lrs",
 			cluster: &v3clusterpb.Cluster{
@@ -156,8 +172,11 @@
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
 			update, err := validateCluster(test.cluster)
-			if ((err != nil) != test.wantErr) || !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) {
-				t.Errorf("validateCluster(%+v) = (%v, %v), wantErr: (%v, %v)", test.cluster, update, err, test.wantUpdate, test.wantErr)
+			if err != nil {
+				t.Errorf("validateCluster(%+v) failed: %v", test.cluster, err)
+			}
+			if !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) {
+				t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, update, test.wantUpdate)
 			}
 		})
 	}
diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go
index b36a5af..4a53138 100644
--- a/xds/internal/client/client_test.go
+++ b/xds/internal/client/client_test.go
@@ -20,9 +20,13 @@
 
 import (
 	"context"
+	"fmt"
 	"testing"
 	"time"
 
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/internal/grpctest"
 	"google.golang.org/grpc/internal/testutils"
@@ -49,7 +53,8 @@
 	testEDSName = "test-eds"
 
 	defaultTestWatchExpiryTimeout = 500 * time.Millisecond
-	defaultTestTimeout            = 1 * time.Second
+	defaultTestTimeout            = 5 * time.Second
+	defaultTestShortTimeout       = 10 * time.Millisecond // For events expected to *not* happen.
 )
 
 func clientOpts(balancerName string, overrideWatchExpiryTImeout bool) Options {
@@ -68,24 +73,22 @@
 }
 
 type testAPIClient struct {
-	r UpdateHandler
-
 	addWatches    map[ResourceType]*testutils.Channel
 	removeWatches map[ResourceType]*testutils.Channel
 }
 
-func overrideNewAPIClient() (<-chan *testAPIClient, func()) {
+func overrideNewAPIClient() (*testutils.Channel, func()) {
 	origNewAPIClient := newAPIClient
-	ch := make(chan *testAPIClient, 1)
+	ch := testutils.NewChannel()
 	newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
-		ret := newTestAPIClient(opts.Parent)
-		ch <- ret
+		ret := newTestAPIClient()
+		ch.Send(ret)
 		return ret, nil
 	}
 	return ch, func() { newAPIClient = origNewAPIClient }
 }
 
-func newTestAPIClient(r UpdateHandler) *testAPIClient {
+func newTestAPIClient() *testAPIClient {
 	addWatches := map[ResourceType]*testutils.Channel{
 		ListenerResource:    testutils.NewChannel(),
 		RouteConfigResource: testutils.NewChannel(),
@@ -99,7 +102,6 @@
 		EndpointsResource:   testutils.NewChannel(),
 	}
 	return &testAPIClient{
-		r:             r,
 		addWatches:    addWatches,
 		removeWatches: removeWatches,
 	}
@@ -121,53 +123,108 @@
 // TestWatchCallAnotherWatch covers the case where watch() is called inline by a
 // callback. It makes sure it doesn't cause a deadlock.
 func (s) TestWatchCallAnotherWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
 
 	clusterUpdateCh := testutils.NewChannel()
 	firstTime := true
-	c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+	client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
 		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
 		// Calls another watch inline, to ensure there's deadlock.
-		c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
+		client.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
 
-		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-		defer cancel()
-		if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); firstTime && err != nil {
+		if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); firstTime && err != nil {
 			t.Fatalf("want new watch to start, got error %v", err)
 		}
 		firstTime = false
 	})
-
-	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate2,
-	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate2})
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
+
+func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ListenerUpdate) error {
+	u, err := updateCh.Receive(ctx)
+	if err != nil {
+		return fmt.Errorf("timeout when waiting for listener update: %v", err)
+	}
+	gotUpdate := u.(ldsUpdateErr)
+	if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+		return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+	}
+	return nil
+}
+
+func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate RouteConfigUpdate) error {
+	u, err := updateCh.Receive(ctx)
+	if err != nil {
+		return fmt.Errorf("timeout when waiting for route configuration update: %v", err)
+	}
+	gotUpdate := u.(rdsUpdateErr)
+	if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+		return fmt.Errorf("unexpected route config update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+	}
+	return nil
+}
+
+func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ServiceUpdate) error {
+	u, err := updateCh.Receive(ctx)
+	if err != nil {
+		return fmt.Errorf("timeout when waiting for service update: %v", err)
+	}
+	gotUpdate := u.(serviceUpdateErr)
+	if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
+		return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+	}
+	return nil
+}
+
+func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate ClusterUpdate) error {
+	u, err := updateCh.Receive(ctx)
+	if err != nil {
+		return fmt.Errorf("timeout when waiting for cluster update: %v", err)
+	}
+	gotUpdate := u.(clusterUpdateErr)
+	if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate) {
+		return fmt.Errorf("unexpected clusterUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+	}
+	return nil
+}
+
+func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate EndpointsUpdate) error {
+	u, err := updateCh.Receive(ctx)
+	if err != nil {
+		return fmt.Errorf("timeout when waiting for endpoints update: %v", err)
+	}
+	gotUpdate := u.(endpointsUpdateErr)
+	if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
+		return fmt.Errorf("unexpected endpointsUpdate: (%v, %v), want: (%v, nil)", gotUpdate.u, gotUpdate.err, wantUpdate)
+	}
+	return nil
+}
diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go
index 3c816f9..c512675 100644
--- a/xds/internal/client/client_watchers_cluster_test.go
+++ b/xds/internal/client/client_watchers_cluster_test.go
@@ -22,6 +22,8 @@
 	"context"
 	"testing"
 
+	"github.com/google/go-cmp/cmp"
+
 	"google.golang.org/grpc/internal/testutils"
 )
 
@@ -35,63 +37,52 @@
 // - an update for another resource name
 // - an update is received after cancel()
 func (s) TestClusterWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	// TODO: add a timeout to this recv.
-	// Note that this won't be necessary if we finish the TODO below to call
-	// Client directly instead of v2Client.r.
-	v2Client := <-v2ClientCh
-
-	clusterUpdateCh := testutils.NewChannel()
-	cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
-		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	clusterUpdateCh := testutils.NewChannel()
+	cancelWatch := client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
-	// This is calling v2Client.r to send the update, but r is set to Client, so
-	// this is same as calling Client to update. The one thing this covers is
-	// that `NewXDSV2Client` is called with the right parent.
-	//
-	// TODO: in a future cleanup, this (and the same thing in other tests) can
-	// be changed call Client directly.
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another update, with an extra resource for a different resource name.
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
+	client.NewClusters(map[string]ClusterUpdate{
 		testCDSName:  wantUpdate,
 		"randomName": {},
 	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %+v, %v, want channel recv timeout", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Cancel watch, and send update again.
 	cancelWatch()
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -99,64 +90,62 @@
 // TestClusterTwoWatchSameResourceName covers the case where an update is received
 // after two watch() for the same resource name.
 func (s) TestClusterTwoWatchSameResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	var clusterUpdateChs []*testutils.Channel
-	var cancelLastWatch func()
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
 
+	var clusterUpdateChs []*testutils.Channel
+	var cancelLastWatch func()
 	const count = 2
 	for i := 0; i < count; i++ {
 		clusterUpdateCh := testutils.NewChannel()
 		clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
-		cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+		cancelLastWatch = client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
 			clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
 	}
 
 	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate,
-	})
-
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
 	for i := 0; i < count; i++ {
-		if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-			t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
 	// Cancel the last watch, and send update again.
 	cancelLastWatch()
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName: wantUpdate,
-	})
-
+	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate})
 	for i := 0; i < count-1; i++ {
-		if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-			t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
-	if u, err := clusterUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := clusterUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -164,34 +153,37 @@
 // TestClusterThreeWatchDifferentResourceName covers the case where an update is
 // received after three watch() for different resource names.
 func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	var clusterUpdateChs []*testutils.Channel
-	const count = 2
-
-	// Two watches for the same name.
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	// Two watches for the same name.
+	var clusterUpdateChs []*testutils.Channel
+	const count = 2
 	for i := 0; i < count; i++ {
 		clusterUpdateCh := testutils.NewChannel()
 		clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
-		c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
+		client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
 			clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
@@ -199,83 +191,86 @@
 
 	// Third watch for a different name.
 	clusterUpdateCh2 := testutils.NewChannel()
-	c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
+	client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
 		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
 	wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
+	client.NewClusters(map[string]ClusterUpdate{
 		testCDSName + "1": wantUpdate1,
 		testCDSName + "2": wantUpdate2,
 	})
 
 	for i := 0; i < count; i++ {
-		if u, err := clusterUpdateChs[i].Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) {
-			t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1); err != nil {
+			t.Fatal(err)
 		}
 	}
-
-	if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
 // TestClusterWatchAfterCache covers the case where watch is called after the update
 // is in cache.
 func (s) TestClusterWatchAfterCache(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	clusterUpdateCh := testutils.NewChannel()
-	c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
-		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	clusterUpdateCh := testutils.NewChannel()
+	client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
+	client.NewClusters(map[string]ClusterUpdate{
 		testCDSName: wantUpdate,
 	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another watch for the resource in cache.
 	clusterUpdateCh2 := testutils.NewChannel()
-	c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
+	client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
 		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
 	})
-	if n, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if n, err := apiClient.addWatches[ClusterResource].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
 	}
 
 	// New watch should receives the update.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Old watch should see nothing.
-	if u, err := clusterUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -284,38 +279,38 @@
 // an CDS response for the request that it sends out. We want the watch callback
 // to be invoked with an error once the watchExpiryTimer fires.
 func (s) TestClusterWatchExpiryTimer(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	clusterUpdateCh := testutils.NewChannel()
-	c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
-		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	clusterUpdateCh := testutils.NewChannel()
+	client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
+		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
+	})
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	u, err := clusterUpdateCh.Receive(ctx)
 	if err != nil {
-		t.Fatalf("failed to get clusterUpdate: %v", err)
+		t.Fatalf("timeout when waiting for cluster update: %v", err)
 	}
-	uu := u.(clusterUpdateErr)
-	if uu.u != (ClusterUpdate{}) {
-		t.Errorf("unexpected clusterUpdate: %v, want %v", uu.u, ClusterUpdate{})
-	}
-	if uu.err == nil {
-		t.Errorf("unexpected clusterError: <nil>, want error watcher timeout")
+	gotUpdate := u.(clusterUpdateErr)
+	if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, ClusterUpdate{}) {
+		t.Fatalf("unexpected clusterUpdate: (%v, %v), want: (ClusterUpdate{}, nil)", gotUpdate.u, gotUpdate.err)
 	}
 }
 
@@ -323,41 +318,44 @@
 // an CDS response for the request that it sends out. We want no error even
 // after expiry timeout.
 func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	clusterUpdateCh := testutils.NewChannel()
-	c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
-		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	clusterUpdateCh := testutils.NewChannel()
+	client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
+		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
+	})
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
+	client.NewClusters(map[string]ClusterUpdate{
 		testCDSName: wantUpdate,
 	})
-
-	if u, err := clusterUpdateCh.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Wait for an error, the error should never happen.
-	u, err := clusterUpdateCh.Receive(ctx)
-	if err != context.DeadlineExceeded {
-		t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err)
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestWatchExpiryTimeout)
+	defer sCancel()
+	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
+		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
 
@@ -368,80 +366,78 @@
 // - one more update without the removed resource
 //   - the callback (above) shouldn't receive any update
 func (s) TestClusterResourceRemoved(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	clusterUpdateCh1 := testutils.NewChannel()
-	c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
-		clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	clusterUpdateCh1 := testutils.NewChannel()
+	client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
+		clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
+
 	// Another watch for a different name.
 	clusterUpdateCh2 := testutils.NewChannel()
-	c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
+	client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
 		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[ClusterResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
 	wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
+	client.NewClusters(map[string]ClusterUpdate{
 		testCDSName + "1": wantUpdate1,
 		testCDSName + "2": wantUpdate2,
 	})
-
-	if u, err := clusterUpdateCh1.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1); err != nil {
+		t.Fatal(err)
 	}
-
-	if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 
 	// Send another update to remove resource 1.
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName + "2": wantUpdate2,
-	})
+	client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2})
 
-	// watcher 1 should get an error.
+	// Watcher 1 should get an error.
 	if u, err := clusterUpdateCh1.Receive(ctx); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound {
 		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
 	}
 
-	// watcher 2 should get the same update again.
-	if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	// Watcher 2 should get the same update again.
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 
 	// Send one more update without resource 1.
-	v2Client.r.NewClusters(map[string]ClusterUpdate{
-		testCDSName + "2": wantUpdate2,
-	})
+	client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2})
 
-	// watcher 1 should get an error.
-	if u, err := clusterUpdateCh1.Receive(ctx); err != context.DeadlineExceeded {
-		t.Errorf("unexpected clusterUpdate: %v, want receiving from channel timeout", u)
+	// Watcher 1 should not see an update.
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := clusterUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
+		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 
-	// watcher 2 should get the same update again.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := clusterUpdateCh2.Receive(ctx); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
+	// Watcher 2 should get the same update again.
+	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
diff --git a/xds/internal/client/client_watchers_endpoints_test.go b/xds/internal/client/client_watchers_endpoints_test.go
index c414e3a..822ee59 100644
--- a/xds/internal/client/client_watchers_endpoints_test.go
+++ b/xds/internal/client/client_watchers_endpoints_test.go
@@ -23,7 +23,6 @@
 	"testing"
 
 	"github.com/google/go-cmp/cmp"
-	"github.com/google/go-cmp/cmp/cmpopts"
 
 	"google.golang.org/grpc/internal/testutils"
 	"google.golang.org/grpc/xds/internal"
@@ -44,7 +43,6 @@
 			Weight:    1,
 		},
 	}
-	endpointsCmpOpts = []cmp.Option{cmp.AllowUnexported(endpointsUpdateErr{}), cmpopts.EquateEmpty()}
 )
 
 type endpointsUpdateErr struct {
@@ -57,55 +55,51 @@
 // - an update for another resource name (which doesn't trigger callback)
 // - an update is received after cancel()
 func (s) TestEndpointsWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	endpointsUpdateCh := testutils.NewChannel()
-	cancelWatch := c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
-		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	endpointsUpdateCh := testutils.NewChannel()
+	cancelWatch := client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	if u, err := endpointsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
-		t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+	if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another update for a different resource name.
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		"randomName": {},
-	})
-
-	if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewEndpoints(map[string]EndpointsUpdate{"randomName": {}})
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 
 	// Cancel watch, and send update again.
 	cancelWatch()
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -113,64 +107,64 @@
 // TestEndpointsTwoWatchSameResourceName covers the case where an update is received
 // after two watch() for the same resource name.
 func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	var endpointsUpdateChs []*testutils.Channel
-	const count = 2
-
-	var cancelLastWatch func()
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	const count = 2
+	var (
+		endpointsUpdateChs []*testutils.Channel
+		cancelLastWatch    func()
+	)
 	for i := 0; i < count; i++ {
 		endpointsUpdateCh := testutils.NewChannel()
 		endpointsUpdateChs = append(endpointsUpdateChs, endpointsUpdateCh)
-		cancelLastWatch = c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+		cancelLastWatch = client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
 			endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
 	}
 
 	wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		testCDSName: wantUpdate,
-	})
-
+	client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
 	for i := 0; i < count; i++ {
-		if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
-			t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
 	// Cancel the last watch, and send update again.
 	cancelLastWatch()
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		testCDSName: wantUpdate,
-	})
-
+	client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
 	for i := 0; i < count-1; i++ {
-		if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
-			t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
-	if u, err := endpointsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := endpointsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -178,34 +172,37 @@
 // TestEndpointsThreeWatchDifferentResourceName covers the case where an update is
 // received after three watch() for different resource names.
 func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	var endpointsUpdateChs []*testutils.Channel
-	const count = 2
-
-	// Two watches for the same name.
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	// Two watches for the same name.
+	var endpointsUpdateChs []*testutils.Channel
+	const count = 2
 	for i := 0; i < count; i++ {
 		endpointsUpdateCh := testutils.NewChannel()
 		endpointsUpdateChs = append(endpointsUpdateChs, endpointsUpdateCh)
-		c.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) {
+		client.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) {
 			endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
@@ -213,83 +210,84 @@
 
 	// Third watch for a different name.
 	endpointsUpdateCh2 := testutils.NewChannel()
-	c.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) {
+	client.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) {
 		endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
 	wantUpdate2 := EndpointsUpdate{Localities: []Locality{testLocalities[1]}}
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
+	client.NewEndpoints(map[string]EndpointsUpdate{
 		testCDSName + "1": wantUpdate1,
 		testCDSName + "2": wantUpdate2,
 	})
 
 	for i := 0; i < count; i++ {
-		if u, err := endpointsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate1, nil}, endpointsCmpOpts...) {
-			t.Errorf("i=%v, unexpected endpointsUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyEndpointsUpdate(ctx, endpointsUpdateChs[i], wantUpdate1); err != nil {
+			t.Fatal(err)
 		}
 	}
-
-	if u, err := endpointsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate2, nil}, endpointsCmpOpts...) {
-		t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
 // TestEndpointsWatchAfterCache covers the case where watch is called after the update
 // is in cache.
 func (s) TestEndpointsWatchAfterCache(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	endpointsUpdateCh := testutils.NewChannel()
-	c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
-		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	endpointsUpdateCh := testutils.NewChannel()
+	client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}}
-	v2Client.r.NewEndpoints(map[string]EndpointsUpdate{
-		testCDSName: wantUpdate,
-	})
-
-	if u, err := endpointsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
-		t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: wantUpdate})
+	if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another watch for the resource in cache.
 	endpointsUpdateCh2 := testutils.NewChannel()
-	c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+	client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
 		endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err})
 	})
-	if n, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if n, err := apiClient.addWatches[EndpointsResource].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
 	}
 
 	// New watch should receives the update.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := endpointsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, endpointsCmpOpts...) {
-		t.Errorf("unexpected endpointsUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh2, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Old watch should see nothing.
-	if u, err := endpointsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := endpointsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected endpointsUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -298,36 +296,37 @@
 // an CDS response for the request that it sends out. We want the watch callback
 // to be invoked with an error once the watchExpiryTimer fires.
 func (s) TestEndpointsWatchExpiryTimer(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	endpointsUpdateCh := testutils.NewChannel()
-	c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
-		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
-	})
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[EndpointsResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	endpointsUpdateCh := testutils.NewChannel()
+	client.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) {
+		endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	u, err := endpointsUpdateCh.Receive(ctx)
 	if err != nil {
-		t.Fatalf("failed to get endpointsUpdate: %v", err)
+		t.Fatalf("timeout when waiting for endpoints update: %v", err)
 	}
-	uu := u.(endpointsUpdateErr)
-	if !cmp.Equal(uu.u, EndpointsUpdate{}, endpointsCmpOpts...) {
-		t.Errorf("unexpected endpointsUpdate: %v, want %v", uu.u, EndpointsUpdate{})
-	}
-	if uu.err == nil {
-		t.Errorf("unexpected endpointsError: <nil>, want error watcher timeout")
+	gotUpdate := u.(endpointsUpdateErr)
+	if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, EndpointsUpdate{}) {
+		t.Fatalf("unexpected endpointsUpdate: (%v, %v), want: (EndpointsUpdate{}, nil)", gotUpdate.u, gotUpdate.err)
 	}
 }
diff --git a/xds/internal/client/client_watchers_lds_test.go b/xds/internal/client/client_watchers_lds_test.go
index b3a25be..e922654 100644
--- a/xds/internal/client/client_watchers_lds_test.go
+++ b/xds/internal/client/client_watchers_lds_test.go
@@ -35,54 +35,52 @@
 // - an update for another resource name
 // - an update is received after cancel()
 func (s) TestLDSWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	ldsUpdateCh := testutils.NewChannel()
-	cancelWatch := c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
-		ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	ldsUpdateCh := testutils.NewChannel()
+	cancelWatch := client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+		ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: wantUpdate,
-	})
-
-	if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another update, with an extra resource for a different resource name.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
+	client.NewListeners(map[string]ListenerUpdate{
 		testLDSName:  wantUpdate,
 		"randomName": {},
 	})
-
-	if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Cancel watch, and send update again.
 	cancelWatch()
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: wantUpdate,
-	})
-
-	if u, err := ldsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -90,64 +88,65 @@
 // TestLDSTwoWatchSameResourceName covers the case where an update is received
 // after two watch() for the same resource name.
 func (s) TestLDSTwoWatchSameResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	var ldsUpdateChs []*testutils.Channel
-	const count = 2
-
-	var cancelLastWatch func()
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	const count = 2
+	var (
+		ldsUpdateChs    []*testutils.Channel
+		cancelLastWatch func()
+	)
+
 	for i := 0; i < count; i++ {
 		ldsUpdateCh := testutils.NewChannel()
 		ldsUpdateChs = append(ldsUpdateChs, ldsUpdateCh)
-		cancelLastWatch = c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+		cancelLastWatch = client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
 			ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
 	}
 
 	wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: wantUpdate,
-	})
-
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
 	for i := 0; i < count; i++ {
-		if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-			t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
 	// Cancel the last watch, and send update again.
 	cancelLastWatch()
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: wantUpdate,
-	})
-
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
 	for i := 0; i < count-1; i++ {
-		if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-			t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
-	if u, err := ldsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := ldsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -155,34 +154,38 @@
 // TestLDSThreeWatchDifferentResourceName covers the case where an update is
 // received after three watch() for different resource names.
 func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
+	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
 
 	var ldsUpdateChs []*testutils.Channel
 	const count = 2
 
 	// Two watches for the same name.
-	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
 	for i := 0; i < count; i++ {
 		ldsUpdateCh := testutils.NewChannel()
 		ldsUpdateChs = append(ldsUpdateChs, ldsUpdateCh)
-		c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
+		client.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
 			ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
@@ -190,83 +193,84 @@
 
 	// Third watch for a different name.
 	ldsUpdateCh2 := testutils.NewChannel()
-	c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
+	client.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
 		ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := ListenerUpdate{RouteConfigName: testRDSName + "1"}
 	wantUpdate2 := ListenerUpdate{RouteConfigName: testRDSName + "2"}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
+	client.NewListeners(map[string]ListenerUpdate{
 		testLDSName + "1": wantUpdate1,
 		testLDSName + "2": wantUpdate2,
 	})
 
 	for i := 0; i < count; i++ {
-		if u, err := ldsUpdateChs[i].Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) {
-			t.Errorf("i=%v, unexpected ListenerUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyListenerUpdate(ctx, ldsUpdateChs[i], wantUpdate1); err != nil {
+			t.Fatal(err)
 		}
 	}
-
-	if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
 // TestLDSWatchAfterCache covers the case where watch is called after the update
 // is in cache.
 func (s) TestLDSWatchAfterCache(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	ldsUpdateCh := testutils.NewChannel()
-	c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
-		ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	ldsUpdateCh := testutils.NewChannel()
+	client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+		ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := ListenerUpdate{RouteConfigName: testRDSName}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: wantUpdate,
-	})
-
-	if u, err := ldsUpdateCh.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: wantUpdate})
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another watch for the resource in cache.
 	ldsUpdateCh2 := testutils.NewChannel()
-	c.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
+	client.watchLDS(testLDSName, func(update ListenerUpdate, err error) {
 		ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
 	})
-	if n, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if n, err := apiClient.addWatches[ListenerResource].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
 	}
 
-	// New watch should receives the update.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	// New watch should receive the update.
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Old watch should see nothing.
-	if u, err := ldsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := ldsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected ListenerUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -278,80 +282,77 @@
 // - one more update without the removed resource
 //   - the callback (above) shouldn't receive any update
 func (s) TestLDSResourceRemoved(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	ldsUpdateCh1 := testutils.NewChannel()
-	c.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
-		ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	ldsUpdateCh1 := testutils.NewChannel()
+	client.watchLDS(testLDSName+"1", func(update ListenerUpdate, err error) {
+		ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 	// Another watch for a different name.
 	ldsUpdateCh2 := testutils.NewChannel()
-	c.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
+	client.watchLDS(testLDSName+"2", func(update ListenerUpdate, err error) {
 		ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := ListenerUpdate{RouteConfigName: testEDSName + "1"}
 	wantUpdate2 := ListenerUpdate{RouteConfigName: testEDSName + "2"}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
+	client.NewListeners(map[string]ListenerUpdate{
 		testLDSName + "1": wantUpdate1,
 		testLDSName + "2": wantUpdate2,
 	})
-
-	if u, err := ldsUpdateCh1.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh1, wantUpdate1); err != nil {
+		t.Fatal(err)
 	}
-
-	if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 
 	// Send another update to remove resource 1.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName + "2": wantUpdate2,
-	})
+	client.NewListeners(map[string]ListenerUpdate{testLDSName + "2": wantUpdate2})
 
-	// watcher 1 should get an error.
+	// Watcher 1 should get an error.
 	if u, err := ldsUpdateCh1.Receive(ctx); err != nil || ErrType(u.(ldsUpdateErr).err) != ErrorTypeResourceNotFound {
 		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
 	}
 
-	// watcher 2 should get the same update again.
-	if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	// Watcher 2 should get the same update again.
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 
 	// Send one more update without resource 1.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName + "2": wantUpdate2,
-	})
+	client.NewListeners(map[string]ListenerUpdate{testLDSName + "2": wantUpdate2})
 
-	// watcher 1 should get an error.
-	if u, err := ldsUpdateCh1.Receive(ctx); err != context.DeadlineExceeded {
+	// Watcher 1 should not see an update.
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := ldsUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected ListenerUpdate: %v, want receiving from channel timeout", u)
 	}
 
-	// watcher 2 should get the same update again.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := ldsUpdateCh2.Receive(ctx); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
-		t.Errorf("unexpected ListenerUpdate: %v, error receiving from channel: %v", u, err)
+	// Watcher 2 should get the same update again.
+	if err := verifyListenerUpdate(ctx, ldsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
diff --git a/xds/internal/client/client_watchers_rds_test.go b/xds/internal/client/client_watchers_rds_test.go
index abaec80..e1f23f3 100644
--- a/xds/internal/client/client_watchers_rds_test.go
+++ b/xds/internal/client/client_watchers_rds_test.go
@@ -23,6 +23,7 @@
 	"testing"
 
 	"github.com/google/go-cmp/cmp"
+
 	"google.golang.org/grpc/internal/testutils"
 )
 
@@ -36,54 +37,51 @@
 // - an update for another resource name (which doesn't trigger callback)
 // - an update is received after cancel()
 func (s) TestRDSWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	rdsUpdateCh := testutils.NewChannel()
-	cancelWatch := c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
-		rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
-	})
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	rdsUpdateCh := testutils.NewChannel()
+	cancelWatch := client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+		rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		testRDSName: wantUpdate,
-	})
-
-	if u, err := rdsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-		t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+	if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another update for a different resource name.
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		"randomName": {},
-	})
-
-	if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{"randomName": {}})
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 
 	// Cancel watch, and send update again.
 	cancelWatch()
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		testRDSName: wantUpdate,
-	})
-
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -91,64 +89,64 @@
 // TestRDSTwoWatchSameResourceName covers the case where an update is received
 // after two watch() for the same resource name.
 func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	var rdsUpdateChs []*testutils.Channel
-	const count = 2
-
-	var cancelLastWatch func()
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	const count = 2
+	var (
+		rdsUpdateChs    []*testutils.Channel
+		cancelLastWatch func()
+	)
 	for i := 0; i < count; i++ {
 		rdsUpdateCh := testutils.NewChannel()
 		rdsUpdateChs = append(rdsUpdateChs, rdsUpdateCh)
-		cancelLastWatch = c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+		cancelLastWatch = client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
 			rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
 	}
 
 	wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		testRDSName: wantUpdate,
-	})
-
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
 	for i := 0; i < count; i++ {
-		if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-			t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
 	// Cancel the last watch, and send update again.
 	cancelLastWatch()
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		testRDSName: wantUpdate,
-	})
-
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
 	for i := 0; i < count-1; i++ {
-		if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-			t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil {
+			t.Fatal(err)
 		}
 	}
 
-	if u, err := rdsUpdateChs[count-1].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := rdsUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -156,34 +154,37 @@
 // TestRDSThreeWatchDifferentResourceName covers the case where an update is
 // received after three watch() for different resource names.
 func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	var rdsUpdateChs []*testutils.Channel
-	const count = 2
-
-	// Two watches for the same name.
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	// Two watches for the same name.
+	var rdsUpdateChs []*testutils.Channel
+	const count = 2
 	for i := 0; i < count; i++ {
 		rdsUpdateCh := testutils.NewChannel()
 		rdsUpdateChs = append(rdsUpdateChs, rdsUpdateCh)
-		c.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) {
+		client.watchRDS(testRDSName+"1", func(update RouteConfigUpdate, err error) {
 			rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
 		})
 
 		if i == 0 {
 			// A new watch is registered on the underlying API client only for
 			// the first iteration because we are using the same resource name.
-			if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+			if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 				t.Fatalf("want new watch to start, got error %v", err)
 			}
 		}
@@ -191,82 +192,84 @@
 
 	// Third watch for a different name.
 	rdsUpdateCh2 := testutils.NewChannel()
-	c.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) {
+	client.watchRDS(testRDSName+"2", func(update RouteConfigUpdate, err error) {
 		rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate1 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}}
 	wantUpdate2 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName + "1": wantUpdate1,
 		testRDSName + "2": wantUpdate2,
 	})
 
 	for i := 0; i < count; i++ {
-		if u, err := rdsUpdateChs[i].Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate1, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-			t.Errorf("i=%v, unexpected RouteConfigUpdate: %v, error receiving from channel: %v", i, u, err)
+		if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate1); err != nil {
+			t.Fatal(err)
 		}
 	}
-
-	if u, err := rdsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate2, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-		t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh2, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
 // TestRDSWatchAfterCache covers the case where watch is called after the update
 // is in cache.
 func (s) TestRDSWatchAfterCache(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
+	defer client.Close()
 
-	v2Client := <-v2ClientCh
-
-	rdsUpdateCh := testutils.NewChannel()
-	c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
-		rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
-	})
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	rdsUpdateCh := testutils.NewChannel()
+	client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+		rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
-		testRDSName: wantUpdate,
-	})
-
-	if u, err := rdsUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
-		t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
+	if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another watch for the resource in cache.
 	rdsUpdateCh2 := testutils.NewChannel()
-	c.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
+	client.watchRDS(testRDSName, func(update RouteConfigUpdate, err error) {
 		rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err})
 	})
-	if n, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if n, err := apiClient.addWatches[RouteConfigResource].Receive(sCtx); err != context.DeadlineExceeded {
 		t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
 	}
 
 	// New watch should receives the update.
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
 	if u, err := rdsUpdateCh2.Receive(ctx); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdateErr{})) {
 		t.Errorf("unexpected RouteConfigUpdate: %v, error receiving from channel: %v", u, err)
 	}
 
 	// Old watch should see nothing.
-	if u, err := rdsUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := rdsUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected RouteConfigUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go
index 5a6a401..d4f69d5 100644
--- a/xds/internal/client/client_watchers_service_test.go
+++ b/xds/internal/client/client_watchers_service_test.go
@@ -23,7 +23,6 @@
 	"testing"
 
 	"github.com/google/go-cmp/cmp"
-	"github.com/google/go-cmp/cmp/cmpopts"
 
 	"google.golang.org/grpc/internal/testutils"
 )
@@ -33,47 +32,45 @@
 	err error
 }
 
-var serviceCmpOpts = []cmp.Option{cmp.AllowUnexported(serviceUpdateErr{}), cmpopts.EquateEmpty()}
-
 // TestServiceWatch covers the cases:
 // - an update is received after a watch()
 // - an update with routes received
 func (s) TestServiceWatch(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
-
-	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	wantUpdate2 := ServiceUpdate{
@@ -82,7 +79,7 @@
 			Action: map[string]uint32{testCDSName: 1},
 		}},
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {
 			Routes: []*Route{{
 				Prefix: newStringP(""),
@@ -90,8 +87,8 @@
 			}},
 		},
 	})
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
@@ -99,70 +96,66 @@
 // response, the second LDS response trigger an new RDS watch, and an update of
 // the old RDS watch doesn't trigger update to service callback.
 func (s) TestServiceWatchLDSUpdate(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
-
-	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another LDS update with a different RDS_name.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName + "2"},
-	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName + "2"}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 
 	// Another update for the old name.
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 
-	wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
 	// RDS update for the new name.
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName + "2": {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}},
 	})
-
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
+		t.Fatal(err)
 	}
 }
 
@@ -170,49 +163,48 @@
 // error (because only one is allowed). But the first watch still receives
 // updates.
 func (s) TestServiceWatchSecond(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
-
-	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+
+	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
-	serviceUpdateCh2 := testutils.NewChannel()
 	// Call WatchService() again, with the same or different name.
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+	serviceUpdateCh2 := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
 		serviceUpdateCh2.Send(serviceUpdateErr{u: update, err: err})
 	})
-
 	u, err := serviceUpdateCh2.Receive(ctx)
 	if err != nil {
 		t.Fatalf("failed to get serviceUpdate: %v", err)
@@ -227,18 +219,17 @@
 
 	// Send update again, first callback should be called, second should
 	// timeout.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
-	})
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
-	if u, err := serviceUpdateCh2.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := serviceUpdateCh2.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -247,25 +238,28 @@
 // does not respond to the requests being sent out as part of registering a
 // service update watcher. The callback will get an error.
 func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
+	}
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
+	})
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
 	u, err := serviceUpdateCh.Receive(ctx)
@@ -281,37 +275,39 @@
 	}
 }
 
-// TestServiceWatchEmptyRDS tests the case where the underlying v2Client
+// TestServiceWatchEmptyRDS tests the case where the underlying apiClient
 // receives an empty RDS response. The callback will get an error.
 func (s) TestServiceWatchEmptyRDS(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{})
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{})
 	u, err := serviceUpdateCh.Receive(ctx)
 	if err != nil {
 		t.Fatalf("failed to get serviceUpdate: %v", err)
@@ -329,36 +325,40 @@
 // received after the client is closed, and we make sure that the registered
 // watcher callback is not invoked.
 func (s) TestServiceWatchWithClientClose(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, true))
+	client, err := New(clientOpts(testXDSServer, true))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+
 	// Client is closed before it receives the RDS response.
-	c.Close()
-	if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	client.Close()
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
 	}
 }
@@ -367,49 +367,49 @@
 // update contains the same RDS name as the previous, the RDS watch isn't
 // canceled and restarted.
 func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
-
-	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
 
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Another LDS update with a the same RDS_name.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
-	})
-	if v, err := v2Client.removeWatches[RouteConfigResource].Receive(ctx); err == nil {
-		t.Fatalf("unexpected rds watch cancel: %v", v)
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if _, err := apiClient.removeWatches[RouteConfigResource].Receive(sCtx); err != context.DeadlineExceeded {
+		t.Fatalf("unexpected rds watch cancel")
 	}
 }
 
@@ -420,47 +420,46 @@
 // - one more update without the removed resource
 //   - the callback (above) shouldn't receive any update
 func (s) TestServiceResourceRemoved(t *testing.T) {
-	v2ClientCh, cleanup := overrideNewAPIClient()
+	apiClientCh, cleanup := overrideNewAPIClient()
 	defer cleanup()
 
-	c, err := New(clientOpts(testXDSServer, false))
+	client, err := New(clientOpts(testXDSServer, false))
 	if err != nil {
 		t.Fatalf("failed to create client: %v", err)
 	}
-	defer c.Close()
-
-	v2Client := <-v2ClientCh
-
-	serviceUpdateCh := testutils.NewChannel()
-	c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
-		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
-	})
-
-	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	defer client.Close()
 
 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 	defer cancel()
-	if _, err := v2Client.addWatches[ListenerResource].Receive(ctx); err != nil {
-		t.Fatalf("want new watch to start, got error %v", err)
+	c, err := apiClientCh.Receive(ctx)
+	if err != nil {
+		t.Fatalf("timeout when waiting for API client to be created: %v", err)
 	}
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
+	apiClient := c.(*testAPIClient)
+
+	serviceUpdateCh := testutils.NewChannel()
+	client.WatchService(testLDSName, func(update ServiceUpdate, err error) {
+		serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
 	})
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+		t.Fatalf("want new watch to start, got error %v", err)
+	}
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
 	})
-
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 
 	// Remove LDS resource, should cancel the RDS watch, and trigger resource
 	// removed error.
-	v2Client.r.NewListeners(map[string]ListenerUpdate{})
-	if _, err := v2Client.removeWatches[RouteConfigResource].Receive(ctx); err != nil {
+	client.NewListeners(map[string]ListenerUpdate{})
+	if _, err := apiClient.removeWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want watch to be canceled, got error %v", err)
 	}
 	if u, err := serviceUpdateCh.Receive(ctx); err != nil || ErrType(u.(serviceUpdateErr).err) != ErrorTypeResourceNotFound {
@@ -469,34 +468,33 @@
 
 	// Send RDS update for the removed LDS resource, expect no updates to
 	// callback, because RDS should be canceled.
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}},
 	})
-	if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
 	}
 
 	// Add LDS resource, but not RDS resource, should
 	//  - start a new RDS watch
 	//  - timeout on service channel, because RDS cache was cleared
-	v2Client.r.NewListeners(map[string]ListenerUpdate{
-		testLDSName: {RouteConfigName: testRDSName},
-	})
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if _, err := v2Client.addWatches[RouteConfigResource].Receive(ctx); err != nil {
+	client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
+	if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil {
 		t.Fatalf("want new watch to start, got error %v", err)
 	}
-	if u, err := serviceUpdateCh.Receive(ctx); err != context.DeadlineExceeded {
+	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
+	defer sCancel()
+	if u, err := serviceUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
 		t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
 	}
 
-	v2Client.r.NewRouteConfigs(map[string]RouteConfigUpdate{
+	client.NewRouteConfigs(map[string]RouteConfigUpdate{
 		testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}},
 	})
-	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
-	defer cancel()
-	if u, err := serviceUpdateCh.Receive(ctx); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, nil}, serviceCmpOpts...) {
-		t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
+	wantUpdate = ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}
+	if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
+		t.Fatal(err)
 	}
 }