blob: 9ac2ca2b52a0054f150c69b0322093695db0430a [file] [log] [blame]
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package resolver
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
func init() {
balancer.Register(cspB{})
}
type cspB struct{}
func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return nil
}
func (cspB) Name() string {
return "csp_experimental"
}
type cspConfig struct {
ArbitraryField string `json:"arbitrary_field"`
}
// TestXDSResolverClusterSpecifierPlugin tests that cluster specifier plugins
// produce the correct service config, and that the config selector routes to a
// cluster specifier plugin supported by this service config (i.e. prefixed with
// a cluster specifier plugin prefix).
func (s) TestXDSResolverClusterSpecifierPlugin(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}},
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}
cluster := clustermanager.GetPickedClusterForTesting(res.Context)
clusterWant := clusterSpecifierPluginPrefix + "cspA"
if cluster != clusterWant {
t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant)
}
}
// TestXDSResolverClusterSpecifierPluginConfigUpdate tests that cluster
// specifier plugins produce the correct service config, and that on an update
// to the CSP Configuration, the new config is accounted for in the output
// service config.
func (s) TestXDSResolverClusterSpecifierPluginConfigUpdate(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}},
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "changed"}}}},
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON = `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"changed"}}]
}
}
}}]}`
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
}
// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and
// their corresponding configurations remain in service config if RPCs are in
// flight.
func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}},
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}
cluster := clustermanager.GetPickedClusterForTesting(res.Context)
clusterWant := clusterSpecifierPluginPrefix + "cspA"
if cluster != clusterWant {
t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant)
}
// delay res.OnCommitted()
// Perform TWO updates to ensure the old config selector does not hold a reference to cspA
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON2 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}]
},
"cluster_specifier_plugin:cspB":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}]
}
}
}}]}`
wantSCParsed2 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON2)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config))
}
// Invoke OnCommitted; should lead to a service config update that deletes
// cspA.
res.OnCommitted()
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON3 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspB":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}]
}
}
}}]}`
wantSCParsed3 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON3)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config))
}
}