blob: b286a61d638b9de3bdbf21270b33b114d0742c26 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import (
"context"
"sync"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
)
// ErrResourceTypeUnsupported is an error used to indicate an unsupported xDS
// resource type. The wrapped ErrStr contains the details.
type ErrResourceTypeUnsupported struct {
ErrStr string
}
// Error helps implements the error interface.
func (e ErrResourceTypeUnsupported) Error() string {
return e.ErrStr
}
// VersionedClient is the interface to be provided by the transport protocol
// specific client implementations. This mainly deals with the actual sending
// and receiving of messages.
type VersionedClient interface {
// NewStream returns a new xDS client stream specific to the underlying
// transport protocol version.
NewStream(ctx context.Context) (grpc.ClientStream, error)
// SendRequest constructs and sends out a DiscoveryRequest message specific
// to the underlying transport protocol version.
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error
// RecvResponse uses the provided stream to receive a response specific to
// the underlying transport protocol version.
RecvResponse(s grpc.ClientStream) (proto.Message, error)
// HandleResponse parses and validates the received response and notifies
// the top-level client which in turn notifies the registered watchers.
//
// Return values are: resourceType, version, nonce, error.
// If the provided protobuf message contains a resource type which is not
// supported, implementations must return an error of type
// ErrResourceTypeUnsupported.
HandleResponse(proto.Message) (ResourceType, string, string, error)
// NewLoadStatsStream returns a new LRS client stream specific to the underlying
// transport protocol version.
NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error)
// SendFirstLoadStatsRequest constructs and sends the first request on the
// LRS stream.
SendFirstLoadStatsRequest(s grpc.ClientStream) error
// HandleLoadStatsResponse receives the first response from the server which
// contains the load reporting interval and the clusters for which the
// server asks the client to report load for.
//
// If the response sets SendAllClusters to true, the returned clusters is
// nil.
HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error)
// SendLoadStatsRequest will be invoked at regular intervals to send load
// report with load data reported since the last time this method was
// invoked.
SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error
}
// TransportHelper contains all xDS transport protocol related functionality
// which is common across different versioned client implementations.
//
// TransportHelper takes care of sending and receiving xDS requests and
// responses on an ADS stream. It also takes care of ACK/NACK handling. It
// delegates to the actual versioned client implementations wherever
// appropriate.
//
// Implements the APIClient interface which makes it possible for versioned
// client implementations to embed this type, and thereby satisfy the interface
// requirements.
type TransportHelper struct {
cancelCtx context.CancelFunc
vClient VersionedClient
logger *grpclog.PrefixLogger
backoff func(int) time.Duration
streamCh chan grpc.ClientStream
sendCh *buffer.Unbounded
mu sync.Mutex
// Message specific watch infos, protected by the above mutex. These are
// written to, after successfully reading from the update channel, and are
// read from when recovering from a broken stream to resend the xDS
// messages. When the user of this client object cancels a watch call,
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[ResourceType]map[string]bool
// versionMap contains the version that was acked (the version in the ack
// request that was sent on wire). The key is rType, the value is the
// version string, becaues the versions for different resource types should
// be independent.
versionMap map[ResourceType]string
// nonceMap contains the nonce from the most recent received response.
nonceMap map[ResourceType]string
}
// NewTransportHelper creates a new transport helper to be used by versioned
// client implementations.
func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backoff func(int) time.Duration) *TransportHelper {
ctx, cancelCtx := context.WithCancel(context.Background())
t := &TransportHelper{
cancelCtx: cancelCtx,
vClient: vc,
logger: logger,
backoff: backoff,
streamCh: make(chan grpc.ClientStream, 1),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[ResourceType]map[string]bool),
versionMap: make(map[ResourceType]string),
nonceMap: make(map[ResourceType]string),
}
go t.run(ctx)
return t
}
// AddWatch adds a watch for an xDS resource given its type and name.
func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
rType: rType,
remove: false,
resource: resourceName,
})
}
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
rType: rType,
remove: true,
resource: resourceName,
})
}
// Close closes the transport helper.
func (t *TransportHelper) Close() {
t.cancelCtx()
}
// run starts an ADS stream (and backs off exponentially, if the previous
// stream failed without receiving a single reply) and runs the sender and
// receiver routines to send and receive data from the stream respectively.
func (t *TransportHelper) run(ctx context.Context) {
go t.send(ctx)
// TODO: start a goroutine monitoring ClientConn's connectivity state, and
// report error (and log) when stats is transient failure.
retries := 0
for {
select {
case <-ctx.Done():
return
default:
}
if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
retries++
stream, err := t.vClient.NewStream(ctx)
if err != nil {
t.logger.Warningf("xds: ADS stream creation failed: %v", err)
continue
}
t.logger.Infof("ADS stream created")
select {
case <-t.streamCh:
default:
}
t.streamCh <- stream
if t.recv(stream) {
retries = 0
}
}
}
// send is a separate goroutine for sending watch requests on the xds stream.
//
// It watches the stream channel for new streams, and the request channel for
// new requests to send on the stream.
//
// For each new request (watchAction), it's
// - processed and added to the watch map
// - so resend will pick them up when there are new streams
// - sent on the current stream if there's one
// - the current stream is cleared when any send on it fails
//
// For each new stream, all the existing requests will be resent.
//
// Note that this goroutine doesn't do anything to the old stream when there's a
// new one. In fact, there should be only one stream in progress, and new one
// should only be created when the old one fails (recv returns an error).
func (t *TransportHelper) send(ctx context.Context) {
var stream grpc.ClientStream
for {
select {
case <-ctx.Done():
return
case stream = <-t.streamCh:
if !t.sendExisting(stream) {
// send failed, clear the current stream.
stream = nil
}
case u := <-t.sendCh.Get():
t.sendCh.Load()
var (
target []string
rType ResourceType
version, nonce, errMsg string
send bool
)
switch update := u.(type) {
case *watchAction:
target, rType, version, nonce = t.processWatchInfo(update)
case *ackAction:
target, rType, version, nonce, send = t.processAckInfo(update, stream)
if !send {
continue
}
errMsg = update.errMsg
}
if stream == nil {
// There's no stream yet. Skip the request. This request
// will be resent to the new streams. If no stream is
// created, the watcher will timeout (same as server not
// sending response back).
continue
}
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
}
}
}
}
// sendExisting sends out xDS requests for registered watchers when recovering
// from a broken stream.
//
// We call stream.Send() here with the lock being held. It should be OK to do
// that here because the stream has just started and Send() usually returns
// quickly (once it pushes the message onto the transport layer) and is only
// ever blocked if we don't have enough flow control quota.
func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
t.mu.Lock()
defer t.mu.Unlock()
// Reset the ack versions when the stream restarts.
t.versionMap = make(map[ResourceType]string)
t.nonceMap = make(map[ResourceType]string)
for rType, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
t.logger.Errorf("ADS request failed: %v", err)
return false
}
}
return true
}
// recv receives xDS responses on the provided ADS stream and branches out to
// message specific handlers.
func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
success := false
for {
resp, err := t.vClient.RecvResponse(stream)
if err != nil {
t.logger.Warningf("ADS stream is closed with error: %v", err)
return success
}
rType, version, nonce, err := t.vClient.HandleResponse(resp)
if e, ok := err.(ErrResourceTypeUnsupported); ok {
t.logger.Warningf("%s", e.ErrStr)
continue
}
if err != nil {
t.sendCh.Put(&ackAction{
rType: rType,
version: "",
nonce: nonce,
errMsg: err.Error(),
stream: stream,
})
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
continue
}
t.sendCh.Put(&ackAction{
rType: rType,
version: version,
nonce: nonce,
stream: stream,
})
t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce)
success = true
}
}
func mapToSlice(m map[string]bool) (ret []string) {
for i := range m {
ret = append(ret, i)
}
return
}
type watchAction struct {
rType ResourceType
remove bool // Whether this is to remove watch for the resource.
resource string
}
// processWatchInfo pulls the fields needed by the request from a watchAction.
//
// It also updates the watch map.
func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) {
t.mu.Lock()
defer t.mu.Unlock()
var current map[string]bool
current, ok := t.watchMap[w.rType]
if !ok {
current = make(map[string]bool)
t.watchMap[w.rType] = current
}
if w.remove {
delete(current, w.resource)
if len(current) == 0 {
delete(t.watchMap, w.rType)
}
} else {
current[w.resource] = true
}
rType = w.rType
target = mapToSlice(current)
// We don't reset version or nonce when a new watch is started. The version
// and nonce from previous response are carried by the request unless the
// stream is recreated.
ver = t.versionMap[rType]
nonce = t.nonceMap[rType]
return target, rType, ver, nonce
}
type ackAction struct {
rType ResourceType
version string // NACK if version is an empty string.
nonce string
errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
stream grpc.ClientStream
}
// processAckInfo pulls the fields needed by the ack request from a ackAction.
//
// If no active watch is found for this ack, it returns false for send.
func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) {
if ack.stream != stream {
// If ACK's stream isn't the current sending stream, this means the ACK
// was pushed to queue before the old stream broke, and a new stream has
// been started since. Return immediately here so we don't update the
// nonce for the new stream.
return nil, UnknownResource, "", "", false
}
rType = ack.rType
t.mu.Lock()
defer t.mu.Unlock()
// Update the nonce no matter if we are going to send the ACK request on
// wire. We may not send the request if the watch is canceled. But the nonce
// needs to be updated so the next request will have the right nonce.
nonce = ack.nonce
t.nonceMap[rType] = nonce
s, ok := t.watchMap[rType]
if !ok || len(s) == 0 {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackAction is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
return nil, UnknownResource, "", "", false
}
send = true
target = mapToSlice(s)
version = ack.version
if version == "" {
// This is a nack, get the previous acked version.
version = t.versionMap[rType]
// version will still be an empty string if rType isn't
// found in versionMap, this can happen if there wasn't any ack
// before.
} else {
t.versionMap[rType] = version
}
return target, rType, version, nonce, send
}
// reportLoad starts an LRS stream to report load data to the management server.
// It blocks until the context is cancelled.
func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) {
retries := 0
for {
if ctx.Err() != nil {
return
}
if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
retries++
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
if err != nil {
logger.Warningf("lrs: failed to create stream: %v", err)
continue
}
logger.Infof("lrs: created LRS stream")
if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
logger.Warningf("lrs: failed to send first request: %v", err)
continue
}
clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
logger.Warning(err)
continue
}
retries = 0
t.sendLoads(ctx, stream, opts.loadStore, clusters, interval)
}
}
func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-ctx.Done():
return
}
if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
logger.Warning(err)
return
}
}
}