blob: 7f67be89422640202de541157e2ffb7ff797d1f3 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import (
"fmt"
"sync"
"time"
)
type watchInfoState int
const (
watchInfoStateStarted watchInfoState = iota
watchInfoStateRespReceived
watchInfoStateTimeout
watchInfoStateCanceled
)
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
c *Client
rType ResourceType
target string
ldsCallback func(ListenerUpdate, error)
rdsCallback func(RouteConfigUpdate, error)
cdsCallback func(ClusterUpdate, error)
edsCallback func(EndpointsUpdate, error)
expiryTimer *time.Timer
// mu protects state, and c.scheduleCallback().
// - No callback should be scheduled after watchInfo is canceled.
// - No timeout error should be scheduled after watchInfo is resp received.
mu sync.Mutex
state watchInfoState
}
func (wi *watchInfo) newUpdate(update interface{}) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.c.scheduleCallback(wi, update, nil)
}
func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target))
}
func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target))
}
// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
)
switch wi.rType {
case ListenerResource:
u = ListenerUpdate{}
case RouteConfigResource:
u = RouteConfigUpdate{}
case ClusterResource:
u = ClusterUpdate{}
case EndpointsResource:
u = EndpointsUpdate{}
}
wi.c.scheduleCallback(wi, u, err)
}
func (wi *watchInfo) cancel() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.expiryTimer.Stop()
wi.state = watchInfoStateCanceled
}
func (c *Client) watch(wi *watchInfo) (cancel func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var watchers map[string]map[*watchInfo]bool
switch wi.rType {
case ListenerResource:
watchers = c.ldsWatchers
case RouteConfigResource:
watchers = c.rdsWatchers
case ClusterResource:
watchers = c.cdsWatchers
case EndpointsResource:
watchers = c.edsWatchers
}
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
c.apiClient.AddWatch(wi.rType, resourceName)
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.rType {
case ListenerResource:
if v, ok := c.ldsCache[resourceName]; ok {
c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case RouteConfigResource:
if v, ok := c.rdsCache[resourceName]; ok {
c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case ClusterResource:
if v, ok := c.cdsCache[resourceName]; ok {
c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case EndpointsResource:
if v, ok := c.edsCache[resourceName]; ok {
c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
}
return func() {
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
c.mu.Lock()
defer c.mu.Unlock()
if s := watchers[resourceName]; s != nil {
// Remove this watcher, so it's callback will not be called in the
// future.
delete(s, wi)
if len(s) == 0 {
c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
c.apiClient.RemoveWatch(wi.rType, resourceName)
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.rType {
case ListenerResource:
delete(c.ldsCache, resourceName)
case RouteConfigResource:
delete(c.rdsCache, resourceName)
case ClusterResource:
delete(c.cdsCache, resourceName)
case EndpointsResource:
delete(c.edsCache, resourceName)
}
}
}
}
}
// WatchListener uses LDS to discover information about the provided listener.
//
// WatchListener is expected to called only from the server side implementation
// of xDS. Clients will use WatchService instead.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchListener(listener string, cb func(ListenerUpdate, error)) (cancel func()) {
return c.watchLDS(listener, cb)
}
// watchLDS starts a listener watcher for the service..
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) watchLDS(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ListenerResource,
target: serviceName,
ldsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// watchRDS starts a listener watcher for the service..
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) watchRDS(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: RouteConfigResource,
target: routeName,
rdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchService uses LDS and RDS to discover information about the provided
// serviceName.
//
// WatchService can only be called once. The second call will not start a
// watcher and the callback will get an error. It's this case because an xDS
// client is expected to be used only by one ClientConn.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchService(serviceName string, cb func(ServiceUpdate, error)) (cancel func()) {
c.mu.Lock()
if len(c.ldsWatchers) != 0 {
go cb(ServiceUpdate{}, fmt.Errorf("unexpected WatchService when there's another service being watched"))
c.mu.Unlock()
return func() {}
}
c.mu.Unlock()
w := &serviceUpdateWatcher{c: c, serviceCb: cb}
w.ldsCancel = c.watchLDS(serviceName, w.handleLDSResp)
return w.close
}
// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
type serviceUpdateWatcher struct {
c *Client
ldsCancel func()
serviceCb func(ServiceUpdate, error)
mu sync.Mutex
closed bool
rdsName string
rdsCancel func()
}
func (w *serviceUpdateWatcher) handleLDSResp(update ListenerUpdate, err error) {
w.c.logger.Infof("xds: client received LDS update: %+v, err: %v", update, err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if err != nil {
// We check the error type and do different things. For now, the only
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if ErrType(err) == ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(ServiceUpdate{}, err)
return
}
if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
return
}
w.rdsName = update.RouteConfigName
if w.rdsCancel != nil {
w.rdsCancel()
}
w.rdsCancel = w.c.watchRDS(update.RouteConfigName, w.handleRDSResp)
}
func (w *serviceUpdateWatcher) handleRDSResp(update RouteConfigUpdate, err error) {
w.c.logger.Infof("xds: client received RDS update: %+v, err: %v", update, err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if w.rdsCancel == nil {
// This mean only the RDS watch is canceled, can happen if the LDS
// resource is removed.
return
}
if err != nil {
w.serviceCb(ServiceUpdate{}, err)
return
}
w.serviceCb(ServiceUpdate(update), nil)
}
func (w *serviceUpdateWatcher) close() {
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
w.ldsCancel()
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
}
// WatchCluster uses CDS to discover information about the provided
// clusterName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ClusterResource,
target: clusterName,
cdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchEndpoints uses EDS to discover endpoints in the provided clusterName.
//
// WatchEndpoints can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: EndpointsResource,
target: clusterName,
edsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}