| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package server contains internal server-side functionality used by the public |
| // facing xds package. |
| package server |
| |
| import ( |
| "errors" |
| "fmt" |
| "net" |
| "sync" |
| "sync/atomic" |
| "time" |
| "unsafe" |
| |
| "google.golang.org/grpc/backoff" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/grpclog" |
| internalbackoff "google.golang.org/grpc/internal/backoff" |
| "google.golang.org/grpc/internal/envconfig" |
| internalgrpclog "google.golang.org/grpc/internal/grpclog" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" |
| "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" |
| ) |
| |
| var ( |
| logger = grpclog.Component("xds") |
| |
| // Backoff strategy for temporary errors received from Accept(). If this |
| // needs to be configurable, we can inject it through ListenerWrapperParams. |
| bs = internalbackoff.Exponential{Config: backoff.Config{ |
| BaseDelay: 5 * time.Millisecond, |
| Multiplier: 2.0, |
| MaxDelay: 1 * time.Second, |
| }} |
| backoffFunc = bs.Backoff |
| ) |
| |
| // ServingModeCallback is the callback that users can register to get notified |
| // about the server's serving mode changes. The callback is invoked with the |
| // address of the listener and its new mode. The err parameter is set to a |
| // non-nil error if the server has transitioned into not-serving mode. |
| type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error) |
| |
| // DrainCallback is the callback that an xDS-enabled server registers to get |
| // notified about updates to the Listener configuration. The server is expected |
| // to gracefully shutdown existing connections, thereby forcing clients to |
| // reconnect and have the new configuration applied to the newly created |
| // connections. |
| type DrainCallback func(addr net.Addr) |
| |
| func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger { |
| return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p)) |
| } |
| |
| // XDSClient wraps the methods on the XDSClient which are required by |
| // the listenerWrapper. |
| type XDSClient interface { |
| WatchListener(string, func(xdsresource.ListenerUpdate, error)) func() |
| WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func() |
| BootstrapConfig() *bootstrap.Config |
| } |
| |
| // ListenerWrapperParams wraps parameters required to create a listenerWrapper. |
| type ListenerWrapperParams struct { |
| // Listener is the net.Listener passed by the user that is to be wrapped. |
| Listener net.Listener |
| // ListenerResourceName is the xDS Listener resource to request. |
| ListenerResourceName string |
| // XDSCredsInUse specifies whether or not the user expressed interest to |
| // receive security configuration from the control plane. |
| XDSCredsInUse bool |
| // XDSClient provides the functionality from the XDSClient required here. |
| XDSClient XDSClient |
| // ModeCallback is the callback to invoke when the serving mode changes. |
| ModeCallback ServingModeCallback |
| // DrainCallback is the callback to invoke when the Listener gets a LDS |
| // update. |
| DrainCallback DrainCallback |
| } |
| |
| // NewListenerWrapper creates a new listenerWrapper with params. It returns a |
| // net.Listener and a channel which is written to, indicating that the former is |
| // ready to be passed to grpc.Serve(). |
| // |
| // Only TCP listeners are supported. |
| func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan struct{}) { |
| lw := &listenerWrapper{ |
| Listener: params.Listener, |
| name: params.ListenerResourceName, |
| xdsCredsInUse: params.XDSCredsInUse, |
| xdsC: params.XDSClient, |
| modeCallback: params.ModeCallback, |
| drainCallback: params.DrainCallback, |
| isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(), |
| |
| mode: connectivity.ServingModeStarting, |
| closed: grpcsync.NewEvent(), |
| goodUpdate: grpcsync.NewEvent(), |
| ldsUpdateCh: make(chan ldsUpdateWithError, 1), |
| rdsUpdateCh: make(chan rdsHandlerUpdate, 1), |
| } |
| lw.logger = prefixLogger(lw) |
| |
| // Serve() verifies that Addr() returns a valid TCPAddr. So, it is safe to |
| // ignore the error from SplitHostPort(). |
| lisAddr := lw.Listener.Addr().String() |
| lw.addr, lw.port, _ = net.SplitHostPort(lisAddr) |
| |
| lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh) |
| |
| cancelWatch := lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate) |
| lw.logger.Infof("Watch started on resource name %v", lw.name) |
| lw.cancelWatch = func() { |
| cancelWatch() |
| lw.logger.Infof("Watch cancelled on resource name %v", lw.name) |
| } |
| go lw.run() |
| return lw, lw.goodUpdate.Done() |
| } |
| |
| type ldsUpdateWithError struct { |
| update xdsresource.ListenerUpdate |
| err error |
| } |
| |
| // listenerWrapper wraps the net.Listener associated with the listening address |
| // passed to Serve(). It also contains all other state associated with this |
| // particular invocation of Serve(). |
| type listenerWrapper struct { |
| net.Listener |
| logger *internalgrpclog.PrefixLogger |
| |
| name string |
| xdsCredsInUse bool |
| xdsC XDSClient |
| cancelWatch func() |
| modeCallback ServingModeCallback |
| drainCallback DrainCallback |
| |
| // Set to true if the listener is bound to the IP_ANY address (which is |
| // "0.0.0.0" for IPv4 and "::" for IPv6). |
| isUnspecifiedAddr bool |
| // Listening address and port. Used to validate the socket address in the |
| // Listener resource received from the control plane. |
| addr, port string |
| |
| // This is used to notify that a good update has been received and that |
| // Serve() can be invoked on the underlying gRPC server. Using an event |
| // instead of a vanilla channel simplifies the update handler as it need not |
| // keep track of whether the received update is the first one or not. |
| goodUpdate *grpcsync.Event |
| // A small race exists in the XDSClient code between the receipt of an xDS |
| // response and the user cancelling the associated watch. In this window, |
| // the registered callback may be invoked after the watch is canceled, and |
| // the user is expected to work around this. This event signifies that the |
| // listener is closed (and hence the watch is cancelled), and we drop any |
| // updates received in the callback if this event has fired. |
| closed *grpcsync.Event |
| |
| // mu guards access to the current serving mode and the filter chains. The |
| // reason for using an rw lock here is that these fields are read in |
| // Accept() for all incoming connections, but writes happen rarely (when we |
| // get a Listener resource update). |
| mu sync.RWMutex |
| // Current serving mode. |
| mode connectivity.ServingMode |
| // Filter chains received as part of the last good update. |
| filterChains *xdsresource.FilterChainManager |
| |
| // rdsHandler is used for any dynamic RDS resources specified in a LDS |
| // update. |
| rdsHandler *rdsHandler |
| // rdsUpdates are the RDS resources received from the management |
| // server, keyed on the RouteName of the RDS resource. |
| rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate |
| // ldsUpdateCh is a channel for XDSClient LDS updates. |
| ldsUpdateCh chan ldsUpdateWithError |
| // rdsUpdateCh is a channel for XDSClient RDS updates. |
| rdsUpdateCh chan rdsHandlerUpdate |
| } |
| |
| // Accept blocks on an Accept() on the underlying listener, and wraps the |
| // returned net.connWrapper with the configured certificate providers. |
| func (l *listenerWrapper) Accept() (net.Conn, error) { |
| var retries int |
| for { |
| conn, err := l.Listener.Accept() |
| if err != nil { |
| // Temporary() method is implemented by certain error types returned |
| // from the net package, and it is useful for us to not shutdown the |
| // server in these conditions. The listen queue being full is one |
| // such case. |
| if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() { |
| return nil, err |
| } |
| retries++ |
| timer := time.NewTimer(backoffFunc(retries)) |
| select { |
| case <-timer.C: |
| case <-l.closed.Done(): |
| timer.Stop() |
| // Continuing here will cause us to call Accept() again |
| // which will return a non-temporary error. |
| continue |
| } |
| continue |
| } |
| // Reset retries after a successful Accept(). |
| retries = 0 |
| |
| // Since the net.Conn represents an incoming connection, the source and |
| // destination address can be retrieved from the local address and |
| // remote address of the net.Conn respectively. |
| destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr) |
| srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr) |
| if !ok1 || !ok2 { |
| // If the incoming connection is not a TCP connection, which is |
| // really unexpected since we check whether the provided listener is |
| // a TCP listener in Serve(), we return an error which would cause |
| // us to stop serving. |
| return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr()) |
| } |
| |
| l.mu.RLock() |
| if l.mode == connectivity.ServingModeNotServing { |
| // Close connections as soon as we accept them when we are in |
| // "not-serving" mode. Since we accept a net.Listener from the user |
| // in Serve(), we cannot close the listener when we move to |
| // "not-serving". Closing the connection immediately upon accepting |
| // is one of the other ways to implement the "not-serving" mode as |
| // outlined in gRFC A36. |
| l.mu.RUnlock() |
| conn.Close() |
| continue |
| } |
| fc, err := l.filterChains.Lookup(xdsresource.FilterChainLookupParams{ |
| IsUnspecifiedListener: l.isUnspecifiedAddr, |
| DestAddr: destAddr.IP, |
| SourceAddr: srcAddr.IP, |
| SourcePort: srcAddr.Port, |
| }) |
| l.mu.RUnlock() |
| if err != nil { |
| // When a matching filter chain is not found, we close the |
| // connection right away, but do not return an error back to |
| // `grpc.Serve()` from where this Accept() was invoked. Returning an |
| // error to `grpc.Serve()` causes the server to shutdown. If we want |
| // to avoid the server from shutting down, we would need to return |
| // an error type which implements the `Temporary() bool` method, |
| // which is invoked by `grpc.Serve()` to see if the returned error |
| // represents a temporary condition. In the case of a temporary |
| // error, `grpc.Serve()` method sleeps for a small duration and |
| // therefore ends up blocking all connection attempts during that |
| // time frame, which is also not ideal for an error like this. |
| l.logger.Warningf("connection from %s to %s failed to find any matching filter chain", conn.RemoteAddr().String(), conn.LocalAddr().String()) |
| conn.Close() |
| continue |
| } |
| if !envconfig.XDSRBAC { |
| return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil |
| } |
| var rc xdsresource.RouteConfigUpdate |
| if fc.InlineRouteConfig != nil { |
| rc = *fc.InlineRouteConfig |
| } else { |
| rcPtr := atomic.LoadPointer(&l.rdsUpdates) |
| rcuPtr := (*map[string]xdsresource.RouteConfigUpdate)(rcPtr) |
| // This shouldn't happen, but this error protects against a panic. |
| if rcuPtr == nil { |
| return nil, errors.New("route configuration pointer is nil") |
| } |
| rcu := *rcuPtr |
| rc = rcu[fc.RouteConfigName] |
| } |
| // The filter chain will construct a usuable route table on each |
| // connection accept. This is done because preinstantiating every route |
| // table before it is needed for a connection would potentially lead to |
| // a lot of cpu time and memory allocated for route tables that will |
| // never be used. There was also a thought to cache this configuration, |
| // and reuse it for the next accepted connection. However, this would |
| // lead to a lot of code complexity (RDS Updates for a given route name |
| // can come it at any time), and connections aren't accepted too often, |
| // so this reinstantation of the Route Configuration is an acceptable |
| // tradeoff for simplicity. |
| vhswi, err := fc.ConstructUsableRouteConfiguration(rc) |
| if err != nil { |
| l.logger.Warningf("route configuration construction: %v", err) |
| conn.Close() |
| continue |
| } |
| return &connWrapper{Conn: conn, filterChain: fc, parent: l, virtualHosts: vhswi}, nil |
| } |
| } |
| |
| // Close closes the underlying listener. It also cancels the xDS watch |
| // registered in Serve() and closes any certificate provider instances created |
| // based on security configuration received in the LDS response. |
| func (l *listenerWrapper) Close() error { |
| l.closed.Fire() |
| l.Listener.Close() |
| if l.cancelWatch != nil { |
| l.cancelWatch() |
| } |
| l.rdsHandler.close() |
| return nil |
| } |
| |
| // run is a long running goroutine which handles all xds updates. LDS and RDS |
| // push updates onto a channel which is read and acted upon from this goroutine. |
| func (l *listenerWrapper) run() { |
| for { |
| select { |
| case <-l.closed.Done(): |
| return |
| case u := <-l.ldsUpdateCh: |
| l.handleLDSUpdate(u) |
| case u := <-l.rdsUpdateCh: |
| l.handleRDSUpdate(u) |
| } |
| } |
| } |
| |
| // handleLDSUpdate is the callback which handles LDS Updates. It writes the |
| // received update to the update channel, which is picked up by the run |
| // goroutine. |
| func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) { |
| if l.closed.HasFired() { |
| l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err) |
| return |
| } |
| // Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update |
| // listener cares about is most recent update. |
| select { |
| case <-l.ldsUpdateCh: |
| default: |
| } |
| l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err} |
| } |
| |
| // handleRDSUpdate handles a full rds update from rds handler. On a successful |
| // update, the server will switch to ServingModeServing as the full |
| // configuration (both LDS and RDS) has been received. |
| func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { |
| if l.closed.HasFired() { |
| l.logger.Warningf("RDS received update: %v with error: %v, after listener was closed", update.updates, update.err) |
| return |
| } |
| if update.err != nil { |
| l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err) |
| if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { |
| l.switchMode(nil, connectivity.ServingModeNotServing, update.err) |
| } |
| // For errors which are anything other than "resource-not-found", we |
| // continue to use the old configuration. |
| return |
| } |
| atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates)) |
| |
| l.switchMode(l.filterChains, connectivity.ServingModeServing, nil) |
| l.goodUpdate.Fire() |
| } |
| |
| func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { |
| if update.err != nil { |
| l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err) |
| if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { |
| l.switchMode(nil, connectivity.ServingModeNotServing, update.err) |
| } |
| // For errors which are anything other than "resource-not-found", we |
| // continue to use the old configuration. |
| return |
| } |
| l.logger.Infof("Received update for resource %q: %+v", l.name, update.update) |
| |
| // Make sure that the socket address on the received Listener resource |
| // matches the address of the net.Listener passed to us by the user. This |
| // check is done here instead of at the XDSClient layer because of the |
| // following couple of reasons: |
| // - XDSClient cannot know the listening address of every listener in the |
| // system, and hence cannot perform this check. |
| // - this is a very context-dependent check and only the server has the |
| // appropriate context to perform this check. |
| // |
| // What this means is that the XDSClient has ACKed a resource which can push |
| // the server into a "not serving" mode. This is not ideal, but this is |
| // what we have decided to do. See gRPC A36 for more details. |
| ilc := update.update.InboundListenerCfg |
| if ilc.Address != l.addr || ilc.Port != l.port { |
| l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) |
| return |
| } |
| |
| // "Updates to a Listener cause all older connections on that Listener to be |
| // gracefully shut down with a grace period of 10 minutes for long-lived |
| // RPC's, such that clients will reconnect and have the updated |
| // configuration apply." - A36 Note that this is not the same as moving the |
| // Server's state to ServingModeNotServing. That prevents new connections |
| // from being accepted, whereas here we simply want the clients to reconnect |
| // to get the updated configuration. |
| if envconfig.XDSRBAC { |
| if l.drainCallback != nil { |
| l.drainCallback(l.Listener.Addr()) |
| } |
| } |
| l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames) |
| // If there are no dynamic RDS Configurations still needed to be received |
| // from the management server, this listener has all the configuration |
| // needed, and is ready to serve. |
| if len(ilc.FilterChains.RouteConfigNames) == 0 { |
| l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil) |
| l.goodUpdate.Fire() |
| } |
| } |
| |
| // switchMode updates the value of serving mode and filter chains stored in the |
| // listenerWrapper. And if the serving mode has changed, it invokes the |
| // registered mode change callback. |
| func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) { |
| l.mu.Lock() |
| defer l.mu.Unlock() |
| |
| l.filterChains = fcs |
| if l.mode == newMode && l.mode == connectivity.ServingModeServing { |
| // Redundant updates are suppressed only when we are SERVING and the new |
| // mode is also SERVING. In the other case (where we are NOT_SERVING and the |
| // new mode is also NOT_SERVING), the update is not suppressed as: |
| // 1. the error may have change |
| // 2. it provides a timestamp of the last backoff attempt |
| return |
| } |
| l.mode = newMode |
| if l.modeCallback != nil { |
| l.modeCallback(l.Listener.Addr(), newMode, err) |
| } |
| } |