blob: 722748cbd526a7a4b869b78e308adca8e1c7bb3b [file] [log] [blame]
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package server
import (
"sync"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// rdsHandlerUpdate wraps the full RouteConfigUpdate that are dynamically
// queried for a given server side listener.
type rdsHandlerUpdate struct {
updates map[string]xdsresource.RouteConfigUpdate
err error
}
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
type rdsHandler struct {
xdsC XDSClient
mu sync.Mutex
updates map[string]xdsresource.RouteConfigUpdate
cancels map[string]func()
// For a rdsHandler update, the only update wrapped listener cares about is
// most recent one, so this channel will be opportunistically drained before
// sending any new updates.
updateChannel chan rdsHandlerUpdate
}
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
updateChannel: ch,
updates: make(map[string]xdsresource.RouteConfigUpdate),
cancels: make(map[string]func()),
}
}
// updateRouteNamesToWatch handles a list of route names to watch for a given
// server side listener (if a filter chain specifies dynamic RDS configuration).
// This function handles all the logic with respect to any routes that may have
// been added or deleted as compared to what was previously present.
func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
rh.mu.Lock()
defer rh.mu.Unlock()
// Add and start watches for any routes for any new routes in
// routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
func(routeName string) {
rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) {
rh.handleRouteUpdate(routeName, update, err)
})
}(routeName)
}
}
// Delete and cancel watches for any routes from persisted routeNamesToWatch
// that are no longer present.
for routeName := range rh.cancels {
if _, ok := routeNamesToWatch[routeName]; !ok {
rh.cancels[routeName]()
delete(rh.cancels, routeName)
delete(rh.updates, routeName)
}
}
// If the full list (determined by length) of updates are now successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) && len(routeNamesToWatch) != 0 {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
}
// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) {
if err != nil {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
return
}
rh.mu.Lock()
defer rh.mu.Unlock()
rh.updates[routeName] = update
// If the full list (determined by length) of updates have successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
}
func drainAndPush(ch chan rdsHandlerUpdate, update rdsHandlerUpdate) {
select {
case <-ch:
default:
}
ch <- update
}
// close() is meant to be called by wrapped listener when the wrapped listener
// is closed, and it cleans up resources by canceling all the active RDS
// watches.
func (rh *rdsHandler) close() {
rh.mu.Lock()
defer rh.mu.Unlock()
for _, cancel := range rh.cancels {
cancel()
}
}