blob: 31704afc0790436b084a415da4673e2b1e625fc5 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import (
"fmt"
"sync"
"time"
)
type watchInfoState int
const (
watchInfoStateStarted watchInfoState = iota
watchInfoStateRespReceived
watchInfoStateTimeout
watchInfoStateCanceled
)
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
c *clientImpl
rType ResourceType
target string
ldsCallback func(ListenerUpdate, error)
rdsCallback func(RouteConfigUpdate, error)
cdsCallback func(ClusterUpdate, error)
edsCallback func(EndpointsUpdate, error)
expiryTimer *time.Timer
// mu protects state, and c.scheduleCallback().
// - No callback should be scheduled after watchInfo is canceled.
// - No timeout error should be scheduled after watchInfo is resp received.
mu sync.Mutex
state watchInfoState
}
func (wi *watchInfo) newUpdate(update interface{}) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.c.scheduleCallback(wi, update, nil)
}
func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target))
}
func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target))
}
// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
)
switch wi.rType {
case ListenerResource:
u = ListenerUpdate{}
case RouteConfigResource:
u = RouteConfigUpdate{}
case ClusterResource:
u = ClusterUpdate{}
case EndpointsResource:
u = EndpointsUpdate{}
}
wi.c.scheduleCallback(wi, u, err)
}
func (wi *watchInfo) cancel() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.expiryTimer.Stop()
wi.state = watchInfoStateCanceled
}
func (c *clientImpl) watch(wi *watchInfo) (cancel func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var watchers map[string]map[*watchInfo]bool
switch wi.rType {
case ListenerResource:
watchers = c.ldsWatchers
case RouteConfigResource:
watchers = c.rdsWatchers
case ClusterResource:
watchers = c.cdsWatchers
case EndpointsResource:
watchers = c.edsWatchers
}
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
c.apiClient.AddWatch(wi.rType, resourceName)
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.rType {
case ListenerResource:
if v, ok := c.ldsCache[resourceName]; ok {
c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case RouteConfigResource:
if v, ok := c.rdsCache[resourceName]; ok {
c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case ClusterResource:
if v, ok := c.cdsCache[resourceName]; ok {
c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case EndpointsResource:
if v, ok := c.edsCache[resourceName]; ok {
c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
}
return func() {
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
c.mu.Lock()
defer c.mu.Unlock()
if s := watchers[resourceName]; s != nil {
// Remove this watcher, so it's callback will not be called in the
// future.
delete(s, wi)
if len(s) == 0 {
c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
c.apiClient.RemoveWatch(wi.rType, resourceName)
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.rType {
case ListenerResource:
delete(c.ldsCache, resourceName)
case RouteConfigResource:
delete(c.rdsCache, resourceName)
case ClusterResource:
delete(c.cdsCache, resourceName)
case EndpointsResource:
delete(c.edsCache, resourceName)
}
}
}
}
}
// WatchListener uses LDS to discover information about the provided listener.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *clientImpl) WatchListener(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ListenerResource,
target: serviceName,
ldsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchRouteConfig starts a listener watcher for the service..
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *clientImpl) WatchRouteConfig(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: RouteConfigResource,
target: routeName,
rdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchCluster uses CDS to discover information about the provided
// clusterName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *clientImpl) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ClusterResource,
target: clusterName,
cdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchEndpoints uses EDS to discover endpoints in the provided clusterName.
//
// WatchEndpoints can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *clientImpl) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: EndpointsResource,
target: clusterName,
edsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}