blob: 2c1e8b75b8fe955effd9d302153f76b64363f8d3 [file] [log] [blame]
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package server
import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
)
const (
fakeListenerHost = "0.0.0.0"
fakeListenerPort = 50051
testListenerResourceName = "lds.target.1.2.3.4:1111"
defaultTestTimeout = 1 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
var listenerWithRouteConfiguration = &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: "route-1",
},
},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
}),
},
},
},
},
},
}
var listenerWithFilterChains = &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
TransportSocket: &v3corepb.TransportSocket{
Name: "envoy.transport_sockets.tls",
ConfigType: &v3corepb.TransportSocket_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3tlspb.DownstreamTlsContext{
CommonTlsContext: &v3tlspb.CommonTlsContext{
TlsCertificateCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{
InstanceName: "identityPluginInstance",
CertificateName: "identityCertName",
},
},
}),
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "routeName",
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{"lds.target.good:3333"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
}),
},
},
},
},
},
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
type tempError struct{}
func (tempError) Error() string {
return "listenerWrapper test temporary error"
}
func (tempError) Temporary() bool {
return true
}
// connAndErr wraps a net.Conn and an error.
type connAndErr struct {
conn net.Conn
err error
}
// fakeListener allows the user to inject conns returned by Accept().
type fakeListener struct {
acceptCh chan connAndErr
closeCh *testutils.Channel
}
func (fl *fakeListener) Accept() (net.Conn, error) {
cne, ok := <-fl.acceptCh
if !ok {
return nil, errors.New("a non-temporary error")
}
return cne.conn, cne.err
}
func (fl *fakeListener) Close() error {
fl.closeCh.Send(nil)
return nil
}
func (fl *fakeListener) Addr() net.Addr {
return &net.TCPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: fakeListenerPort,
}
}
// fakeConn overrides LocalAddr, RemoteAddr and Close methods.
type fakeConn struct {
net.Conn
local, remote net.Addr
closeCh *testutils.Channel
}
func (fc *fakeConn) LocalAddr() net.Addr {
return fc.local
}
func (fc *fakeConn) RemoteAddr() net.Addr {
return fc.remote
}
func (fc *fakeConn) Close() error {
fc.closeCh.Send(nil)
return nil
}
func newListenerWrapper(t *testing.T) (*listenerWrapper, <-chan struct{}, *fakeclient.Client, *fakeListener, func()) {
t.Helper()
// Create a listener wrapper with a fake listener and fake XDSClient and
// verify that it extracts the host and port from the passed in listener.
lis := &fakeListener{
acceptCh: make(chan connAndErr, 1),
closeCh: testutils.NewChannel(),
}
xdsC := fakeclient.NewClient()
lParams := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: testListenerResourceName,
XDSClient: xdsC,
}
l, readyCh := NewListenerWrapper(lParams)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", lParams)
}
lw, ok := l.(*listenerWrapper)
if !ok {
t.Fatalf("NewListenerWrapper(%+v) returned listener of type %T want *listenerWrapper", lParams, l)
}
if lw.addr != fakeListenerHost || lw.port != strconv.Itoa(fakeListenerPort) {
t.Fatalf("listenerWrapper has host:port %s:%s, want %s:%d", lw.addr, lw.port, fakeListenerHost, fakeListenerPort)
}
return lw, readyCh, xdsC, lis, func() { l.Close() }
}
func (s) TestNewListenerWrapper(t *testing.T) {
_, readyCh, xdsC, _, cleanup := newListenerWrapper(t)
defer cleanup()
// Verify that the listener wrapper registers a listener watch for the
// expected Listener resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
// Push an error to the listener update handler.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{}, errors.New("bad listener update"))
timer := time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to after receipt of a bad Listener update")
}
fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains, nil)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push an update whose address does not match the address to which our
// listener is bound, and verify that the ready channel is not written to.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: "10.0.0.1",
Port: "50051",
FilterChains: fcm,
}}, nil)
timer = time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to after receipt of a bad Listener update")
}
// Push a good update, and verify that the ready channel is written to.
// Since there are no dynamic RDS updates needed to be received, the
// ListenerWrapper does not have to wait for anything else before telling
// that it is ready.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
}
// TestNewListenerWrapperWithRouteUpdate tests the scenario where the listener
// gets built, starts a watch, that watch returns a list of Route Names to
// return, than receives an update from the rds handler. Only after receiving
// the update from the rds handler should it move the server to
// ServingModeServing.
func (s) TestNewListenerWrapperWithRouteUpdate(t *testing.T) {
oldRBAC := envconfig.XDSRBAC
envconfig.XDSRBAC = true
defer func() {
envconfig.XDSRBAC = oldRBAC
}()
_, readyCh, xdsC, _, cleanup := newListenerWrapper(t)
defer cleanup()
// Verify that the listener wrapper registers a listener watch for the
// expected Listener resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
fcm, err := xdsresource.NewFilterChainManager(listenerWithRouteConfiguration, nil)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push a good update which contains a Filter Chain that specifies dynamic
// RDS Resources that need to be received. This should ping rds handler
// about which rds names to start, which will eventually start a watch on
// xds client for rds name "route-1".
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
// This should start a watch on xds client for rds name "route-1".
routeName, err := xdsC.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Route resource: %v", err)
}
if routeName != "route-1" {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", routeName, "route-1")
}
// This shouldn't invoke good update channel, as has not received rds updates yet.
timer := time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to without rds configuration specified")
}
// Invoke rds callback for the started rds watch. This valid rds callback
// should trigger the listener wrapper to fire GoodUpdate, as it has
// received both it's LDS Configuration and also RDS Configuration,
// specified in LDS Configuration.
xdsC.InvokeWatchRouteConfigCallback("route-1", xdsresource.RouteConfigUpdate{}, nil)
// All of the xDS updates have completed, so can expect to send a ping on
// good update channel.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update")
case <-readyCh:
}
}
func (s) TestListenerWrapper_Accept(t *testing.T) {
boCh := testutils.NewChannel()
origBackoffFunc := backoffFunc
backoffFunc = func(v int) time.Duration {
boCh.Send(v)
return 0
}
defer func() { backoffFunc = origBackoffFunc }()
lw, readyCh, xdsC, lis, cleanup := newListenerWrapper(t)
defer cleanup()
// Push a good update with a filter chain which accepts local connections on
// 192.168.0.0/16 subnet and port 80.
fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains, nil)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer close(lis.acceptCh)
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
// Push a non-temporary error into Accept().
nonTempErr := errors.New("a non-temporary error")
lis.acceptCh <- connAndErr{err: nonTempErr}
if _, err := lw.Accept(); err != nonTempErr {
t.Fatalf("listenerWrapper.Accept() returned error: %v, want: %v", err, nonTempErr)
}
// Invoke Accept() in a goroutine since we expect it to swallow:
// 1. temporary errors returned from the underlying listener
// 2. errors related to finding a matching filter chain for the incoming
// connection.
errCh := testutils.NewChannel()
go func() {
conn, err := lw.Accept()
if err != nil {
errCh.Send(err)
return
}
if _, ok := conn.(*connWrapper); !ok {
errCh.Send(errors.New("listenerWrapper.Accept() returned a Conn of type %T, want *connWrapper"))
return
}
errCh.Send(nil)
}()
// Push a temporary error into Accept() and verify that it backs off.
lis.acceptCh <- connAndErr{err: tempError{}}
if _, err := boCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for Accept() to backoff on temporary errors: %v", err)
}
// Push a fakeConn which does not match any filter chains configured on the
// received Listener resource. Verify that the conn is closed.
fc := &fakeConn{
local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 79},
remote: &net.TCPAddr{IP: net.IPv4(10, 1, 1, 1), Port: 80},
closeCh: testutils.NewChannel(),
}
lis.acceptCh <- connAndErr{conn: fc}
if _, err := fc.closeCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for conn to be closed on no filter chain match: %v", err)
}
// Push a fakeConn which matches the filter chains configured on the
// received Listener resource. Verify that Accept() returns.
fc = &fakeConn{
local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2)},
remote: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 80},
closeCh: testutils.NewChannel(),
}
lis.acceptCh <- connAndErr{conn: fc}
if _, err := errCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for Accept() to return the conn on filter chain match: %v", err)
}
}