| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package server |
| |
| import ( |
| "context" |
| "errors" |
| "net" |
| "strconv" |
| "testing" |
| "time" |
| |
| "google.golang.org/grpc/internal/envconfig" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/internal/testutils/xds/e2e" |
| "google.golang.org/grpc/xds/internal/testutils/fakeclient" |
| "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" |
| |
| v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" |
| v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" |
| v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" |
| v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" |
| wrapperspb "github.com/golang/protobuf/ptypes/wrappers" |
| |
| _ "google.golang.org/grpc/xds/internal/httpfilter/router" |
| ) |
| |
| const ( |
| fakeListenerHost = "0.0.0.0" |
| fakeListenerPort = 50051 |
| testListenerResourceName = "lds.target.1.2.3.4:1111" |
| defaultTestTimeout = 1 * time.Second |
| defaultTestShortTimeout = 10 * time.Millisecond |
| ) |
| |
| var listenerWithRouteConfiguration = &v3listenerpb.Listener{ |
| FilterChains: []*v3listenerpb.FilterChain{ |
| { |
| FilterChainMatch: &v3listenerpb.FilterChainMatch{ |
| PrefixRanges: []*v3corepb.CidrRange{ |
| { |
| AddressPrefix: "192.168.0.0", |
| PrefixLen: &wrapperspb.UInt32Value{ |
| Value: uint32(16), |
| }, |
| }, |
| }, |
| SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK, |
| SourcePrefixRanges: []*v3corepb.CidrRange{ |
| { |
| AddressPrefix: "192.168.0.0", |
| PrefixLen: &wrapperspb.UInt32Value{ |
| Value: uint32(16), |
| }, |
| }, |
| }, |
| SourcePorts: []uint32{80}, |
| }, |
| Filters: []*v3listenerpb.Filter{ |
| { |
| Name: "filter-1", |
| ConfigType: &v3listenerpb.Filter_TypedConfig{ |
| TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{ |
| RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ |
| Rds: &v3httppb.Rds{ |
| ConfigSource: &v3corepb.ConfigSource{ |
| ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, |
| }, |
| RouteConfigName: "route-1", |
| }, |
| }, |
| HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, |
| }), |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| var listenerWithFilterChains = &v3listenerpb.Listener{ |
| FilterChains: []*v3listenerpb.FilterChain{ |
| { |
| FilterChainMatch: &v3listenerpb.FilterChainMatch{ |
| PrefixRanges: []*v3corepb.CidrRange{ |
| { |
| AddressPrefix: "192.168.0.0", |
| PrefixLen: &wrapperspb.UInt32Value{ |
| Value: uint32(16), |
| }, |
| }, |
| }, |
| SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK, |
| SourcePrefixRanges: []*v3corepb.CidrRange{ |
| { |
| AddressPrefix: "192.168.0.0", |
| PrefixLen: &wrapperspb.UInt32Value{ |
| Value: uint32(16), |
| }, |
| }, |
| }, |
| SourcePorts: []uint32{80}, |
| }, |
| TransportSocket: &v3corepb.TransportSocket{ |
| Name: "envoy.transport_sockets.tls", |
| ConfigType: &v3corepb.TransportSocket_TypedConfig{ |
| TypedConfig: testutils.MarshalAny(&v3tlspb.DownstreamTlsContext{ |
| CommonTlsContext: &v3tlspb.CommonTlsContext{ |
| TlsCertificateCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{ |
| InstanceName: "identityPluginInstance", |
| CertificateName: "identityCertName", |
| }, |
| }, |
| }), |
| }, |
| }, |
| Filters: []*v3listenerpb.Filter{ |
| { |
| Name: "filter-1", |
| ConfigType: &v3listenerpb.Filter_TypedConfig{ |
| TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{ |
| RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ |
| RouteConfig: &v3routepb.RouteConfiguration{ |
| Name: "routeName", |
| VirtualHosts: []*v3routepb.VirtualHost{{ |
| Domains: []string{"lds.target.good:3333"}, |
| Routes: []*v3routepb.Route{{ |
| Match: &v3routepb.RouteMatch{ |
| PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}, |
| }, |
| Action: &v3routepb.Route_NonForwardingAction{}, |
| }}}}}, |
| }, |
| HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, |
| }), |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| type tempError struct{} |
| |
| func (tempError) Error() string { |
| return "listenerWrapper test temporary error" |
| } |
| |
| func (tempError) Temporary() bool { |
| return true |
| } |
| |
| // connAndErr wraps a net.Conn and an error. |
| type connAndErr struct { |
| conn net.Conn |
| err error |
| } |
| |
| // fakeListener allows the user to inject conns returned by Accept(). |
| type fakeListener struct { |
| acceptCh chan connAndErr |
| closeCh *testutils.Channel |
| } |
| |
| func (fl *fakeListener) Accept() (net.Conn, error) { |
| cne, ok := <-fl.acceptCh |
| if !ok { |
| return nil, errors.New("a non-temporary error") |
| } |
| return cne.conn, cne.err |
| } |
| |
| func (fl *fakeListener) Close() error { |
| fl.closeCh.Send(nil) |
| return nil |
| } |
| |
| func (fl *fakeListener) Addr() net.Addr { |
| return &net.TCPAddr{ |
| IP: net.IPv4(0, 0, 0, 0), |
| Port: fakeListenerPort, |
| } |
| } |
| |
| // fakeConn overrides LocalAddr, RemoteAddr and Close methods. |
| type fakeConn struct { |
| net.Conn |
| local, remote net.Addr |
| closeCh *testutils.Channel |
| } |
| |
| func (fc *fakeConn) LocalAddr() net.Addr { |
| return fc.local |
| } |
| |
| func (fc *fakeConn) RemoteAddr() net.Addr { |
| return fc.remote |
| } |
| |
| func (fc *fakeConn) Close() error { |
| fc.closeCh.Send(nil) |
| return nil |
| } |
| |
| func newListenerWrapper(t *testing.T) (*listenerWrapper, <-chan struct{}, *fakeclient.Client, *fakeListener, func()) { |
| t.Helper() |
| |
| // Create a listener wrapper with a fake listener and fake XDSClient and |
| // verify that it extracts the host and port from the passed in listener. |
| lis := &fakeListener{ |
| acceptCh: make(chan connAndErr, 1), |
| closeCh: testutils.NewChannel(), |
| } |
| xdsC := fakeclient.NewClient() |
| lParams := ListenerWrapperParams{ |
| Listener: lis, |
| ListenerResourceName: testListenerResourceName, |
| XDSClient: xdsC, |
| } |
| l, readyCh := NewListenerWrapper(lParams) |
| if l == nil { |
| t.Fatalf("NewListenerWrapper(%+v) returned nil", lParams) |
| } |
| lw, ok := l.(*listenerWrapper) |
| if !ok { |
| t.Fatalf("NewListenerWrapper(%+v) returned listener of type %T want *listenerWrapper", lParams, l) |
| } |
| if lw.addr != fakeListenerHost || lw.port != strconv.Itoa(fakeListenerPort) { |
| t.Fatalf("listenerWrapper has host:port %s:%s, want %s:%d", lw.addr, lw.port, fakeListenerHost, fakeListenerPort) |
| } |
| return lw, readyCh, xdsC, lis, func() { l.Close() } |
| } |
| |
| func (s) TestNewListenerWrapper(t *testing.T) { |
| _, readyCh, xdsC, _, cleanup := newListenerWrapper(t) |
| defer cleanup() |
| |
| // Verify that the listener wrapper registers a listener watch for the |
| // expected Listener resource name. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| name, err := xdsC.WaitForWatchListener(ctx) |
| if err != nil { |
| t.Fatalf("error when waiting for a watch on a Listener resource: %v", err) |
| } |
| if name != testListenerResourceName { |
| t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName) |
| } |
| |
| // Push an error to the listener update handler. |
| xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{}, errors.New("bad listener update")) |
| timer := time.NewTimer(defaultTestShortTimeout) |
| select { |
| case <-timer.C: |
| timer.Stop() |
| case <-readyCh: |
| t.Fatalf("ready channel written to after receipt of a bad Listener update") |
| } |
| |
| fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains, nil) |
| if err != nil { |
| t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err) |
| } |
| |
| // Push an update whose address does not match the address to which our |
| // listener is bound, and verify that the ready channel is not written to. |
| xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ |
| InboundListenerCfg: &xdsresource.InboundListenerConfig{ |
| Address: "10.0.0.1", |
| Port: "50051", |
| FilterChains: fcm, |
| }}, nil) |
| timer = time.NewTimer(defaultTestShortTimeout) |
| select { |
| case <-timer.C: |
| timer.Stop() |
| case <-readyCh: |
| t.Fatalf("ready channel written to after receipt of a bad Listener update") |
| } |
| |
| // Push a good update, and verify that the ready channel is written to. |
| // Since there are no dynamic RDS updates needed to be received, the |
| // ListenerWrapper does not have to wait for anything else before telling |
| // that it is ready. |
| xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ |
| InboundListenerCfg: &xdsresource.InboundListenerConfig{ |
| Address: fakeListenerHost, |
| Port: strconv.Itoa(fakeListenerPort), |
| FilterChains: fcm, |
| }}, nil) |
| |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update") |
| case <-readyCh: |
| } |
| } |
| |
| // TestNewListenerWrapperWithRouteUpdate tests the scenario where the listener |
| // gets built, starts a watch, that watch returns a list of Route Names to |
| // return, than receives an update from the rds handler. Only after receiving |
| // the update from the rds handler should it move the server to |
| // ServingModeServing. |
| func (s) TestNewListenerWrapperWithRouteUpdate(t *testing.T) { |
| oldRBAC := envconfig.XDSRBAC |
| envconfig.XDSRBAC = true |
| defer func() { |
| envconfig.XDSRBAC = oldRBAC |
| }() |
| _, readyCh, xdsC, _, cleanup := newListenerWrapper(t) |
| defer cleanup() |
| |
| // Verify that the listener wrapper registers a listener watch for the |
| // expected Listener resource name. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| name, err := xdsC.WaitForWatchListener(ctx) |
| if err != nil { |
| t.Fatalf("error when waiting for a watch on a Listener resource: %v", err) |
| } |
| if name != testListenerResourceName { |
| t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName) |
| } |
| fcm, err := xdsresource.NewFilterChainManager(listenerWithRouteConfiguration, nil) |
| if err != nil { |
| t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err) |
| } |
| |
| // Push a good update which contains a Filter Chain that specifies dynamic |
| // RDS Resources that need to be received. This should ping rds handler |
| // about which rds names to start, which will eventually start a watch on |
| // xds client for rds name "route-1". |
| xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ |
| InboundListenerCfg: &xdsresource.InboundListenerConfig{ |
| Address: fakeListenerHost, |
| Port: strconv.Itoa(fakeListenerPort), |
| FilterChains: fcm, |
| }}, nil) |
| |
| // This should start a watch on xds client for rds name "route-1". |
| routeName, err := xdsC.WaitForWatchRouteConfig(ctx) |
| if err != nil { |
| t.Fatalf("error when waiting for a watch on a Route resource: %v", err) |
| } |
| if routeName != "route-1" { |
| t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", routeName, "route-1") |
| } |
| |
| // This shouldn't invoke good update channel, as has not received rds updates yet. |
| timer := time.NewTimer(defaultTestShortTimeout) |
| select { |
| case <-timer.C: |
| timer.Stop() |
| case <-readyCh: |
| t.Fatalf("ready channel written to without rds configuration specified") |
| } |
| |
| // Invoke rds callback for the started rds watch. This valid rds callback |
| // should trigger the listener wrapper to fire GoodUpdate, as it has |
| // received both it's LDS Configuration and also RDS Configuration, |
| // specified in LDS Configuration. |
| xdsC.InvokeWatchRouteConfigCallback("route-1", xdsresource.RouteConfigUpdate{}, nil) |
| |
| // All of the xDS updates have completed, so can expect to send a ping on |
| // good update channel. |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update") |
| case <-readyCh: |
| } |
| } |
| |
| func (s) TestListenerWrapper_Accept(t *testing.T) { |
| boCh := testutils.NewChannel() |
| origBackoffFunc := backoffFunc |
| backoffFunc = func(v int) time.Duration { |
| boCh.Send(v) |
| return 0 |
| } |
| defer func() { backoffFunc = origBackoffFunc }() |
| |
| lw, readyCh, xdsC, lis, cleanup := newListenerWrapper(t) |
| defer cleanup() |
| |
| // Push a good update with a filter chain which accepts local connections on |
| // 192.168.0.0/16 subnet and port 80. |
| fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains, nil) |
| if err != nil { |
| t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err) |
| } |
| xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ |
| InboundListenerCfg: &xdsresource.InboundListenerConfig{ |
| Address: fakeListenerHost, |
| Port: strconv.Itoa(fakeListenerPort), |
| FilterChains: fcm, |
| }}, nil) |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| defer close(lis.acceptCh) |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update") |
| case <-readyCh: |
| } |
| |
| // Push a non-temporary error into Accept(). |
| nonTempErr := errors.New("a non-temporary error") |
| lis.acceptCh <- connAndErr{err: nonTempErr} |
| if _, err := lw.Accept(); err != nonTempErr { |
| t.Fatalf("listenerWrapper.Accept() returned error: %v, want: %v", err, nonTempErr) |
| } |
| |
| // Invoke Accept() in a goroutine since we expect it to swallow: |
| // 1. temporary errors returned from the underlying listener |
| // 2. errors related to finding a matching filter chain for the incoming |
| // connection. |
| errCh := testutils.NewChannel() |
| go func() { |
| conn, err := lw.Accept() |
| if err != nil { |
| errCh.Send(err) |
| return |
| } |
| if _, ok := conn.(*connWrapper); !ok { |
| errCh.Send(errors.New("listenerWrapper.Accept() returned a Conn of type %T, want *connWrapper")) |
| return |
| } |
| errCh.Send(nil) |
| }() |
| |
| // Push a temporary error into Accept() and verify that it backs off. |
| lis.acceptCh <- connAndErr{err: tempError{}} |
| if _, err := boCh.Receive(ctx); err != nil { |
| t.Fatalf("error when waiting for Accept() to backoff on temporary errors: %v", err) |
| } |
| |
| // Push a fakeConn which does not match any filter chains configured on the |
| // received Listener resource. Verify that the conn is closed. |
| fc := &fakeConn{ |
| local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 79}, |
| remote: &net.TCPAddr{IP: net.IPv4(10, 1, 1, 1), Port: 80}, |
| closeCh: testutils.NewChannel(), |
| } |
| lis.acceptCh <- connAndErr{conn: fc} |
| if _, err := fc.closeCh.Receive(ctx); err != nil { |
| t.Fatalf("error when waiting for conn to be closed on no filter chain match: %v", err) |
| } |
| |
| // Push a fakeConn which matches the filter chains configured on the |
| // received Listener resource. Verify that Accept() returns. |
| fc = &fakeConn{ |
| local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2)}, |
| remote: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 80}, |
| closeCh: testutils.NewChannel(), |
| } |
| lis.acceptCh <- connAndErr{conn: fc} |
| if _, err := errCh.Receive(ctx); err != nil { |
| t.Fatalf("error when waiting for Accept() to return the conn on filter chain match: %v", err) |
| } |
| } |