blob: 952a7f67f9cc7f335d85bdbced0a62d4c643e204 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package resolver
import (
"context"
"errors"
"net/url"
"reflect"
"strings"
"testing"
"time"
xxhash "github.com/cespare/xxhash/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
)
const (
targetStr = "target"
routeStr = "route"
cluster = "cluster"
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Microsecond
)
var target = resolver.Target{URL: *testutils.MustParseURL("xds:///" + targetStr)}
var routerFilter = xdsresource.HTTPFilter{Name: "rtr", Filter: httpfilter.Get(router.TypeURL)}
var routerFilterList = []xdsresource.HTTPFilter{routerFilter}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestRegister(t *testing.T) {
if resolver.Get(xdsScheme) == nil {
t.Errorf("scheme %v is not registered", xdsScheme)
}
}
// testClientConn is a fake implemetation of resolver.ClientConn that pushes
// state updates and errors returned by the resolver on to channels for
// consumption by tests.
type testClientConn struct {
resolver.ClientConn
stateCh *testutils.Channel
errorCh *testutils.Channel
}
func (t *testClientConn) UpdateState(s resolver.State) error {
t.stateCh.Replace(s)
return nil
}
func (t *testClientConn) ReportError(err error) {
t.errorCh.Replace(err)
}
func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
return internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
}
func newTestClientConn() *testClientConn {
return &testClientConn{
stateCh: testutils.NewChannel(),
errorCh: testutils.NewChannel(),
}
}
// TestResolverBuilder_ClientCreationFails tests the case where xDS client
// creation fails, and verifies that xDS resolver build fails as well.
func (s) TestResolverBuilder_ClientCreationFails(t *testing.T) {
// Override xDS client creation function and return an error.
origNewClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return nil, nil, errors.New("failed to create xDS client")
}
defer func() {
newXDSClient = origNewClient
}()
// Build an xDS resolver and expect it to fail.
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
if _, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{}); err == nil {
t.Fatalf("builder.Build(%v) succeeded when expected to fail", target)
}
}
// TestResolverBuilder_DifferentBootstrapConfigs tests the resolver builder's
// Build() method with different xDS bootstrap configurations.
func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) {
tests := []struct {
name string
bootstrapCfg *bootstrap.Config // Empty top-level xDS server config, will be set by test logic.
target resolver.Target
buildOpts resolver.BuildOptions
wantErr string
}{
{
name: "good",
bootstrapCfg: &bootstrap.Config{},
target: target,
},
{
name: "authority not defined in bootstrap",
bootstrapCfg: &bootstrap.Config{
ClientDefaultListenerResourceNameTemplate: "%s",
Authorities: map[string]*bootstrap.Authority{
"test-authority": {
ClientListenerResourceNameTemplate: "xdstp://test-authority/%s",
},
},
},
target: resolver.Target{
URL: url.URL{
Host: "non-existing-authority",
Path: "/" + targetStr,
},
},
wantErr: `authority "non-existing-authority" is not found in the bootstrap file`,
},
{
name: "xDS creds specified without certificate providers in bootstrap",
bootstrapCfg: &bootstrap.Config{},
target: target,
buildOpts: resolver.BuildOptions{
DialCreds: func() credentials.TransportCredentials {
creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatalf("xds.NewClientCredentials() failed: %v", err)
}
return creds
}(),
},
wantErr: `xdsCreds specified but certificate_providers config missing in bootstrap file`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Starting xDS management server: %v", err)
}
defer mgmtServer.Stop()
// Add top-level xDS server config corresponding to the above
// management server.
test.bootstrapCfg.XDSServer = &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
}
// Override xDS client creation to use bootstrap configuration
// specified by the test.
origNewClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
// The watch timeout and idle authority timeout values passed to
// NewWithConfigForTesing() are immaterial for this test, as we
// are only testing the resolver build functionality.
return xdsclient.NewWithConfigForTesting(test.bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
}
defer func() {
newXDSClient = origNewClient
}()
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
r, err := builder.Build(test.target, newTestClientConn(), test.buildOpts)
if gotErr, wantErr := err != nil, test.wantErr != ""; gotErr != wantErr {
t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr)
}
if test.wantErr != "" && !strings.Contains(err.Error(), test.wantErr) {
t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr)
}
if err != nil {
// This is the case where we expect an error and got it.
return
}
r.Close()
})
}
}
type setupOpts struct {
bootstrapC *bootstrap.Config
target resolver.Target
}
func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client, *testClientConn, func()) {
t.Helper()
fc := fakeclient.NewClient()
if opts.bootstrapC != nil {
fc.SetBootstrapConfig(opts.bootstrapC)
}
oldClientMaker := newXDSClient
closeCh := make(chan struct{})
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return fc, grpcsync.OnceFunc(func() { close(closeCh) }), nil
}
cancel := func() {
// Make sure the xDS client is closed, in all (successful or failed)
// cases.
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for close")
case <-closeCh:
}
newXDSClient = oldClientMaker
}
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
tcc := newTestClientConn()
r, err := builder.Build(opts.target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("builder.Build(%v) returned err: %v", target, err)
}
return r.(*xdsResolver), fc, tcc, func() {
r.Close()
cancel()
}
}
// waitForWatchListener waits for the WatchListener method to be called on the
// xdsClient within a reasonable amount of time, and also verifies that the
// watch is called with the expected target.
func waitForWatchListener(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) {
t.Helper()
gotTarget, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchService failed with error: %v", err)
}
if gotTarget != wantTarget {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget)
}
}
// waitForWatchRouteConfig waits for the WatchRoute method to be called on the
// xdsClient within a reasonable amount of time, and also verifies that the
// watch is called with the expected target.
func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) {
t.Helper()
gotTarget, err := xdsC.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchService failed with error: %v", err)
}
if gotTarget != wantTarget {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget)
}
}
// buildResolverForTarget builds an xDS resolver for the given target. It
// returns a testClientConn which allows inspection of resolver updates, and a
// function to close the resolver once the test is complete.
func buildResolverForTarget(t *testing.T, target resolver.Target) (*testClientConn, func()) {
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
tcc := newTestClientConn()
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("builder.Build(%v) returned err: %v", target, err)
}
return tcc, r.Close
}
// TestResolverResourceName builds an xDS resolver and verifies that the
// resource name specified in the discovery request matches expectations.
func (s) TestResolverResourceName(t *testing.T) {
// Federation support is required when new style names are used.
oldXDSFederation := envconfig.XDSFederation
envconfig.XDSFederation = true
defer func() { envconfig.XDSFederation = oldXDSFederation }()
tests := []struct {
name string
listenerResourceNameTemplate string
extraAuthority string
dialTarget string
wantResourceName string
}{
{
name: "default %s old style",
listenerResourceNameTemplate: "%s",
dialTarget: "xds:///target",
wantResourceName: "target",
},
{
name: "old style no percent encoding",
listenerResourceNameTemplate: "/path/to/%s",
dialTarget: "xds:///target",
wantResourceName: "/path/to/target",
},
{
name: "new style with %s",
listenerResourceNameTemplate: "xdstp://authority.com/%s",
dialTarget: "xds:///0.0.0.0:8080",
wantResourceName: "xdstp://authority.com/0.0.0.0:8080",
},
{
name: "new style percent encoding",
listenerResourceNameTemplate: "xdstp://authority.com/%s",
dialTarget: "xds:///[::1]:8080",
wantResourceName: "xdstp://authority.com/%5B::1%5D:8080",
},
{
name: "new style different authority",
listenerResourceNameTemplate: "xdstp://authority.com/%s",
extraAuthority: "test-authority",
dialTarget: "xds://test-authority/target",
wantResourceName: "xdstp://test-authority/envoy.config.listener.v3.Listener/target",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the management server to push the requested resource name
// on to a channel. No resources are configured on the management
// server as part of this test, as we are only interested in the
// resource name being requested.
resourceNameCh := make(chan string, 1)
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
// When the resolver is being closed, the watch associated
// with the listener resource will be cancelled, and it
// might result in a discovery request with no resource
// names. Hence, we only consider requests which contain a
// resource name.
var name string
if len(req.GetResourceNames()) == 1 {
name = req.GetResourceNames()[0]
}
select {
case resourceNameCh <- name:
default:
}
return nil
},
})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration with test options.
opts := xdsbootstrap.Options{
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
ClientDefaultListenerResourceNameTemplate: tt.listenerResourceNameTemplate,
}
if tt.extraAuthority != "" {
// In this test, we really don't care about having multiple
// management servers. All we need to verify is whether the
// resource name matches expectation.
opts.Authorities = map[string]string{
tt.extraAuthority: mgmtServer.Address,
}
}
cleanup, err := xdsbootstrap.CreateFile(opts)
if err != nil {
t.Fatal(err)
}
defer cleanup()
_, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)})
defer rClose()
// Verify the resource name in the discovery request being sent out.
select {
case gotResourceName := <-resourceNameCh:
if gotResourceName != tt.wantResourceName {
t.Fatalf("Received discovery request with resource name: %v, want %v", gotResourceName, tt.wantResourceName)
}
case <-time.After(defaultTestTimeout):
t.Fatalf("Timeout when waiting for discovery request")
}
})
}
}
// TestResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xDS client is received after the resolver is closed, and
// verifies that the update is not propagated to the ClientConn.
func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
// Setup the management server that synchronizes with the test goroutine
// using two channels. The management server signals the test goroutine when
// it receives a discovery request for a route configuration resource. And
// the test goroutine signals the management server when the resolver is
// closed.
waitForRouteConfigDiscoveryReqCh := make(chan struct{})
waitForResolverCloseCh := make(chan struct{})
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3RouteConfigURL {
close(waitForRouteConfigDiscoveryReqCh)
<-waitForResolverCloseCh
}
return nil
},
})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
cdsName := "cluster-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Wait for a discovery request for a route configuration resource.
select {
case <-waitForRouteConfigDiscoveryReqCh:
case <-ctx.Done():
t.Fatal("Timeout when waiting for a discovery request for a route configuration resource")
}
// Close the resolver and unblock the management server.
rClose()
close(waitForResolverCloseCh)
// Verify that the update from the management server is not propagated to
// the ClientConn. The xDS resolver, once closed, is expected to drop
// updates from the xDS client.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err)
}
}
// TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method
// closes the xDS client.
func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
bootstrapCfg := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy-management-server-address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
},
}
// Override xDS client creation to use bootstrap configuration pointing to a
// dummy management server. Also close a channel when the returned xDS
// client is closed.
closeCh := make(chan struct{})
origNewClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
return c, func() {
close(closeCh)
cancel()
}, err
}
defer func() {
newXDSClient = origNewClient
}()
_, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
rClose()
select {
case <-closeCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for xDS client to be closed")
}
}
// TestResolverBadServiceUpdate tests the case where a resource returned by the
// management server is NACKed by the xDS client, which then returns an update
// containing an error to the resolver. Verifies that the update is propagated
// to the ClientConn by the resolver. It also tests the cases where the resolver
// gets a good update subsequently, and another error after the good update.
func (s) TestResolverBadServiceUpdate(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure a listener resource that is expected to be NACKed because it
// does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
})
lis := &v3listenerpb.Listener{
Name: serviceName,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{lis},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
wantErr := "no RouteSpecifier"
val, err := tcc.errorCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for error to be propagated to the ClientConn")
}
gotErr := val.(error)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr)
}
// Configure good listener and route configuration resources on the
// management server.
rdsName := "route-" + serviceName
cdsName := "cluster-" + serviceName
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Expect a good update from the resolver.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
// Configure another bad resource on the management server.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{lis},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Expect an error update from the resolver.
val, err = tcc.errorCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for error to be propagated to the ClientConn")
}
gotErr = val.(error)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr)
}
}
// TestResolverGoodServiceUpdate tests the case where the resource returned by
// the management server is ACKed by the xDS client, which then returns a good
// service update to the resolver. The test verifies that the service config
// returned by the resolver matches expectations, and that the config selector
// returned by the resolver picks clusters based on the route configuration
// received from the management server.
func (s) TestResolverGoodServiceUpdate(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
for _, tt := range []struct {
routeConfig *v3routepb.RouteConfiguration
wantServiceConfig string
wantClusters map[string]bool
}{
{
// A route configuration with a single cluster.
routeConfig: &v3routepb.RouteConfiguration{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "test-cluster-1",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
},
}},
}},
}},
}},
},
wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}]
}
}
}
}]
}`,
wantClusters: map[string]bool{"cluster:test-cluster-1": true},
},
{
// A route configuration with a two new clusters.
routeConfig: &v3routepb.RouteConfiguration{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "cluster_1",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "cluster_2",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
},
// This update contains the cluster from the previous update as well
// as this update, as the previous config selector still references
// the old cluster when the new one is pushed.
wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}]
},
"cluster:cluster_1": {
"childPolicy": [{
"cds_experimental": {
"cluster": "cluster_1"
}
}]
},
"cluster:cluster_2": {
"childPolicy": [{
"cds_experimental": {
"cluster": "cluster_2"
}
}]
}
}
}
}]
}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
{
// A redundant route configuration update.
// TODO(easwars): Do we need this, or can we do something else? Because the xds client might swallow this update.
routeConfig: &v3routepb.RouteConfiguration{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "cluster_1",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "cluster_2",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
},
// With this redundant update, the old config selector has been
// stopped, so there are no more references to the first cluster.
// Only the second update's clusters should remain.
wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": {
"children": {
"cluster:cluster_1": {
"childPolicy": [{
"cds_experimental": {
"cluster": "cluster_1"
}
}]
},
"cluster:cluster_2": {
"childPolicy": [{
"cds_experimental": {
"cluster": "cluster_2"
}
}]
}
}
}
}]
}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
} {
// Configure the management server with a good listener resource and a
// route configuration resource, as specified by the test case.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{tt.routeConfig},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantServiceConfig)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("Received unexpected service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
pickedClusters := make(map[string]bool)
// Odds of picking 75% cluster 100 times in a row: 1 in 3E-13. And
// with the random number generator stubbed out, we can rely on this
// to be 100% reproducible.
for i := 0; i < 100; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
cluster := clustermanager.GetPickedClusterForTesting(res.Context)
pickedClusters[cluster] = true
res.OnCommitted()
}
if !cmp.Equal(pickedClusters, tt.wantClusters) {
t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters)
}
}
}
// TestResolverRequestHash tests a case where a resolver receives a RouteConfig update
// with a HashPolicy specifying to generate a hash. The configSelector generated should
// successfully generate a Hash.
func (s) TestResolverRequestHash(t *testing.T) {
oldRH := envconfig.XDSRingHash
envconfig.XDSRingHash = true
defer func() { envconfig.XDSRingHash = oldRH }()
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener resource and a
// route configuration resource that specifies a hash policy.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "test-cluster-1",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
},
},
Terminal: true,
}},
}},
}},
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
// Selecting a config when there was a hash policy specified in the route
// that will be selected should put a request hash in the config's context.
res, err := cs.SelectConfig(iresolver.RPCInfo{
Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")),
Method: "/service/method",
})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
gotHash := ringhash.GetRequestHashForTesting(res.Context)
wantHash := xxhash.Sum64String("/products")
if gotHash != wantHash {
t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)
}
}
// TestResolverRemovedWithRPCs tests the case where resources are removed from
// the management server, causing it to send an empty update to the xDS client,
// which returns a resource-not-found error to the xDS resolver. The test
// verifies that an ongoing RPC is handled properly when this happens.
func (s) TestResolverRemovedWithRPCs(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener and route
// configuration resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
// Delete the resources on the management server. This should result in a
// resource-not-found error from the xDS client.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
t.Fatal(err)
}
// The RPC started earlier is still in progress. So, the xDS resolver will
// not produce an empty service config at this point. Instead it will retain
// the cluster to which the RPC is ongoing in the service config, but will
// return an erroring config selector which will fail new RPCs.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
_, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable)
}
// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()
// Now that the RPC is committed, the xDS resolver is expected to send an
// update with an empty service config.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`{}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
// TestResolverRemovedResource tests the case where resources returned by the
// management server are removed. The test verifies that the resolver pushes the
// expected config selector and service config in this case.
func (s) TestResolverRemovedResource(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure the management server with a good listener and route
// configuration resource.
ldsName := serviceName
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
// "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()
// Delete the resources on the management server, resulting in a
// resource-not-found error from the xDS client.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
t.Fatal(err)
}
// The channel should receive the existing service config with the original
// cluster but with an erroring config selector.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
// "Make another RPC" by invoking the config selector.
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
}
// In the meantime, an empty ServiceConfig update should have been sent.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
// TestResolverWRR tests the case where the route configuration returned by the
// management server contains a set of weighted clusters. The test performs a
// bunch of RPCs using the cluster specifier returned by the resolver, and
// verifies the cluster distribution.
func (s) TestResolverWRR(t *testing.T) {
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = testutils.NewTestWRR
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener resource and a
// route configuration resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "A",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "B",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
// Make RPCs are verify WRR behavior in the cluster specifier.
picks := map[string]int{}
for i := 0; i < 100; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
res.OnCommitted()
}
want := map[string]int{"cluster:A": 75, "cluster:B": 25}
if !cmp.Equal(picks, want) {
t.Errorf("Picked clusters: %v; want: %v", picks, want)
}
}
// TestResolverMaxStreamDuration tests the case where the resolver receives max
// stream duration as part of the listener and route configuration resources.
// The test verifies that the RPC timeout returned by the config selector
// matches expectations. A non-nil max stream duration (this includes an
// explicit zero value) in a matching route overrides the value specified in the
// listener resource.
func (s) TestResolverMaxStreamDuration(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure the management server with a listener resource that specifies a
// max stream duration as part of its HTTP connection manager. Also
// configure a route configuration resource, which has multiple routes with
// different values of max stream duration.
ldsName := serviceName
rdsName := "route-" + serviceName
hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: rdsName,
}},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(1 * time.Second),
},
})
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{{
Name: ldsName,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "A",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
}},
},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
MaxStreamDuration: durationpb.New(5 * time.Second),
},
}},
},
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "B",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
}},
},
MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
MaxStreamDuration: durationpb.New(0 * time.Second),
},
}},
},
{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "C",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
}},
},
}},
},
},
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
testCases := []struct {
name string
method string
want *time.Duration
}{{
name: "RDS setting",
method: "/foo/method",
want: newDurationP(5 * time.Second),
}, {
name: "explicit zero in RDS; ignore LDS",
method: "/bar/method",
want: nil,
}, {
name: "no config in RDS; fallback to LDS",
method: "/baz/method",
want: newDurationP(time.Second),
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := iresolver.RPCInfo{
Method: tc.method,
Context: ctx,
}
res, err := cs.SelectConfig(req)
if err != nil {
t.Errorf("cs.SelectConfig(%v): %v", req, err)
return
}
res.OnCommitted()
got := res.MethodConfig.Timeout
if !cmp.Equal(got, tc.want) {
t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
}
})
}
}
// TestResolverDelayedOnCommitted tests that clusters remain in service
// config if RPCs are in flight.
func (s) TestResolverDelayedOnCommitted(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure the management server with a good listener and route
// configuration resource.
ldsName := serviceName
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "old-cluster")},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:old-cluster": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "old-cluster"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
// Make an RPC, but do not commit it yet.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:old-cluster" {
t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:old-cluster")
}
// Delay resOld.OnCommitted(). As long as there are pending RPCs to removed
// clusters, they still appear in the service config.
// Update the route configuration resource on the management server to
// return a new cluster.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "new-cluster")},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn and ensure the
// old cluster is present in the service config. Also ensure that the newly
// returned config selector does not hold a reference to the old cluster.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:old-cluster": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "old-cluster"
}
}
]
},
"cluster:new-cluster": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "new-cluster"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s\nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}
if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:new-cluster" {
t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:new-cluster")
}
// Invoke OnCommitted on the old RPC; should lead to a service config update
// that deletes the old cluster, as the old cluster no longer has any
// pending RPCs.
resOld.OnCommitted()
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:new-cluster": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "new-cluster"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
// TestResolverMultipleLDSUpdates tests the case where two LDS updates with the
// same RDS name to watch are received without an RDS in between. Those LDS
// updates shouldn't trigger a service config update.
func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
// Build an xDS resolver that uses the above bootstrap configuration
// Creating the xDS resolver should result in creation of the xDS client.
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure the management server with a listener resource, but no route
// configuration resource.
ldsName := serviceName
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Ensure there is no update from the resolver.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
gotState, err := tcc.stateCh.Receive(sCtx)
if err == nil {
t.Fatalf("Received update from resolver %v when none expected", gotState)
}
// Configure the management server with a listener resource that points to
// the same route configuration resource but has different values for some
// other fields. There is still no route configuration resource on the
// management server.
hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: rdsName,
}},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(1 * time.Second),
},
})
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{{
Name: ldsName,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Ensure that there is no update from the resolver.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
gotState, err = tcc.stateCh.Receive(sCtx)
if err == nil {
t.Fatalf("Received update from resolver %v when none expected", gotState)
}
}
type filterBuilder struct {
httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test.
path *[]string
}
var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{}
func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
if config == nil {
panic("unexpected missing config")
}
*fb.path = append(*fb.path, "build:"+config.(filterCfg).s)
err := config.(filterCfg).newStreamErr
if override != nil {
*fb.path = append(*fb.path, "override:"+override.(filterCfg).s)
err = override.(filterCfg).newStreamErr
}
return &filterInterceptor{path: fb.path, s: config.(filterCfg).s, err: err}, nil
}
type filterInterceptor struct {
path *[]string
s string
err error
}
func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
*fi.path = append(*fi.path, "newstream:"+fi.s)
if fi.err != nil {
return nil, fi.err
}
d := func() {
*fi.path = append(*fi.path, "done:"+fi.s)
done()
}
cs, err := newStream(ctx, d)
if err != nil {
return nil, err
}
return &clientStream{ClientStream: cs, path: fi.path, s: fi.s}, nil
}
type clientStream struct {
iresolver.ClientStream
path *[]string
s string
}
type filterCfg struct {
httpfilter.FilterConfig
s string
newStreamErr error
}
func (s) TestXDSResolverHTTPFilters(t *testing.T) {
var path []string
testCases := []struct {
name string
ldsFilters []xdsresource.HTTPFilter
vhOverrides map[string]httpfilter.FilterConfig
rtOverrides map[string]httpfilter.FilterConfig
clOverrides map[string]httpfilter.FilterConfig
rpcRes map[string][][]string
selectErr string
newStreamErr string
}{
{
name: "no router filter",
ldsFilters: []xdsresource.HTTPFilter{
{Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}},
},
rpcRes: map[string][][]string{
"1": {
{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
},
},
selectErr: "no router filter present",
},
{
name: "ignored after router filter",
ldsFilters: []xdsresource.HTTPFilter{
{Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}},
routerFilter,
{Name: "foo2", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo2"}},
},
rpcRes: map[string][][]string{
"1": {
{"build:foo1", "newstream:foo1", "done:foo1"},
},
"2": {
{"build:foo1", "newstream:foo1", "done:foo1"},
{"build:foo1", "newstream:foo1", "done:foo1"},
{"build:foo1", "newstream:foo1", "done:foo1"},
},
},
},
{
name: "NewStream error; ensure earlier interceptor Done is still called",
ldsFilters: []xdsresource.HTTPFilter{
{Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}},
{Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1", newStreamErr: errors.New("bar newstream err")}},
routerFilter,
},
rpcRes: map[string][][]string{
"1": {
{"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* <err in bar1 NewStream> */, "done:foo1"},
},
"2": {
{"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* <err in bar1 NewSteam> */, "done:foo1"},
},
},
newStreamErr: "bar newstream err",
},
{
name: "all overrides",
ldsFilters: []xdsresource.HTTPFilter{
{Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1", newStreamErr: errors.New("this is overridden to nil")}},
{Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1"}},
routerFilter,
},
vhOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}},
rtOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}},
clOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}},
rpcRes: map[string][][]string{
"1": {
{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
},
"2": {
{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
},
},
},
}
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
RouteConfigName: routeStr,
HTTPFilters: tc.ldsFilters,
}, nil)
if i == 0 {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
}
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = testutils.NewTestWRR
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{
Prefix: newStringP("1"), WeightedClusters: map[string]xdsresource.WeightedCluster{
"A": {Weight: 1},
"B": {Weight: 1},
},
}, {
Prefix: newStringP("2"), WeightedClusters: map[string]xdsresource.WeightedCluster{
"A": {Weight: 1},
"B": {Weight: 1, HTTPFilterConfigOverride: tc.clOverrides},
},
HTTPFilterConfigOverride: tc.rtOverrides,
}},
HTTPFilterConfigOverride: tc.vhOverrides,
},
},
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}
for method, wants := range tc.rpcRes {
// Order of wants is non-deterministic.
remainingWant := make([][]string, len(wants))
copy(remainingWant, wants)
for n := range wants {
path = nil
res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: context.Background()})
if tc.selectErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.selectErr) {
t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, tc.selectErr)
}
if err == nil {
res.OnCommitted()
}
continue
}
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}
var doneFunc func()
_, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
doneFunc = done
return nil, nil
})
if tc.newStreamErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.newStreamErr) {
t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.newStreamErr)
}
if err == nil {
res.OnCommitted()
doneFunc()
}
continue
}
if err != nil {
t.Fatalf("unexpected error from Interceptor.NewStream: %v", err)
}
res.OnCommitted()
doneFunc()
// Confirm the desired path is found in remainingWant, and remove it.
pass := false
for i := range remainingWant {
if reflect.DeepEqual(path, remainingWant[i]) {
remainingWant[i] = remainingWant[len(remainingWant)-1]
remainingWant = remainingWant[:len(remainingWant)-1]
pass = true
break
}
}
if !pass {
t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, path, remainingWant)
}
}
}
})
}
}
func newDurationP(d time.Duration) *time.Duration {
return &d
}