| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package client |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/grpc/xds/internal/client/load" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/internal/buffer" |
| "google.golang.org/grpc/internal/grpclog" |
| ) |
| |
| // ErrResourceTypeUnsupported is an error used to indicate an unsupported xDS |
| // resource type. The wrapped ErrStr contains the details. |
| type ErrResourceTypeUnsupported struct { |
| ErrStr string |
| } |
| |
| // Error helps implements the error interface. |
| func (e ErrResourceTypeUnsupported) Error() string { |
| return e.ErrStr |
| } |
| |
| // VersionedClient is the interface to be provided by the transport protocol |
| // specific client implementations. This mainly deals with the actual sending |
| // and receiving of messages. |
| type VersionedClient interface { |
| // NewStream returns a new xDS client stream specific to the underlying |
| // transport protocol version. |
| NewStream(ctx context.Context) (grpc.ClientStream, error) |
| |
| // SendRequest constructs and sends out a DiscoveryRequest message specific |
| // to the underlying transport protocol version. |
| SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error |
| |
| // RecvResponse uses the provided stream to receive a response specific to |
| // the underlying transport protocol version. |
| RecvResponse(s grpc.ClientStream) (proto.Message, error) |
| |
| // HandleResponse parses and validates the received response and notifies |
| // the top-level client which in turn notifies the registered watchers. |
| // |
| // Return values are: resourceType, version, nonce, error. |
| // If the provided protobuf message contains a resource type which is not |
| // supported, implementations must return an error of type |
| // ErrResourceTypeUnsupported. |
| HandleResponse(proto.Message) (ResourceType, string, string, error) |
| |
| // NewLoadStatsStream returns a new LRS client stream specific to the underlying |
| // transport protocol version. |
| NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) |
| |
| // SendFirstLoadStatsRequest constructs and sends the first request on the |
| // LRS stream. |
| SendFirstLoadStatsRequest(s grpc.ClientStream) error |
| |
| // HandleLoadStatsResponse receives the first response from the server which |
| // contains the load reporting interval and the clusters for which the |
| // server asks the client to report load for. |
| // |
| // If the response sets SendAllClusters to true, the returned clusters is |
| // nil. |
| HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error) |
| |
| // SendLoadStatsRequest will be invoked at regular intervals to send load |
| // report with load data reported since the last time this method was |
| // invoked. |
| SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error |
| } |
| |
| // TransportHelper contains all xDS transport protocol related functionality |
| // which is common across different versioned client implementations. |
| // |
| // TransportHelper takes care of sending and receiving xDS requests and |
| // responses on an ADS stream. It also takes care of ACK/NACK handling. It |
| // delegates to the actual versioned client implementations wherever |
| // appropriate. |
| // |
| // Implements the APIClient interface which makes it possible for versioned |
| // client implementations to embed this type, and thereby satisfy the interface |
| // requirements. |
| type TransportHelper struct { |
| cancelCtx context.CancelFunc |
| |
| vClient VersionedClient |
| logger *grpclog.PrefixLogger |
| backoff func(int) time.Duration |
| streamCh chan grpc.ClientStream |
| sendCh *buffer.Unbounded |
| |
| mu sync.Mutex |
| // Message specific watch infos, protected by the above mutex. These are |
| // written to, after successfully reading from the update channel, and are |
| // read from when recovering from a broken stream to resend the xDS |
| // messages. When the user of this client object cancels a watch call, |
| // these are set to nil. All accesses to the map protected and any value |
| // inside the map should be protected with the above mutex. |
| watchMap map[ResourceType]map[string]bool |
| // versionMap contains the version that was acked (the version in the ack |
| // request that was sent on wire). The key is rType, the value is the |
| // version string, becaues the versions for different resource types should |
| // be independent. |
| versionMap map[ResourceType]string |
| // nonceMap contains the nonce from the most recent received response. |
| nonceMap map[ResourceType]string |
| } |
| |
| // NewTransportHelper creates a new transport helper to be used by versioned |
| // client implementations. |
| func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backoff func(int) time.Duration) *TransportHelper { |
| ctx, cancelCtx := context.WithCancel(context.Background()) |
| t := &TransportHelper{ |
| cancelCtx: cancelCtx, |
| vClient: vc, |
| logger: logger, |
| backoff: backoff, |
| |
| streamCh: make(chan grpc.ClientStream, 1), |
| sendCh: buffer.NewUnbounded(), |
| watchMap: make(map[ResourceType]map[string]bool), |
| versionMap: make(map[ResourceType]string), |
| nonceMap: make(map[ResourceType]string), |
| } |
| |
| go t.run(ctx) |
| return t |
| } |
| |
| // AddWatch adds a watch for an xDS resource given its type and name. |
| func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) { |
| t.sendCh.Put(&watchAction{ |
| rType: rType, |
| remove: false, |
| resource: resourceName, |
| }) |
| } |
| |
| // RemoveWatch cancels an already registered watch for an xDS resource |
| // given its type and name. |
| func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) { |
| t.sendCh.Put(&watchAction{ |
| rType: rType, |
| remove: true, |
| resource: resourceName, |
| }) |
| } |
| |
| // Close closes the transport helper. |
| func (t *TransportHelper) Close() { |
| t.cancelCtx() |
| } |
| |
| // run starts an ADS stream (and backs off exponentially, if the previous |
| // stream failed without receiving a single reply) and runs the sender and |
| // receiver routines to send and receive data from the stream respectively. |
| func (t *TransportHelper) run(ctx context.Context) { |
| go t.send(ctx) |
| // TODO: start a goroutine monitoring ClientConn's connectivity state, and |
| // report error (and log) when stats is transient failure. |
| |
| retries := 0 |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| default: |
| } |
| |
| if retries != 0 { |
| timer := time.NewTimer(t.backoff(retries)) |
| select { |
| case <-timer.C: |
| case <-ctx.Done(): |
| if !timer.Stop() { |
| <-timer.C |
| } |
| return |
| } |
| } |
| |
| retries++ |
| stream, err := t.vClient.NewStream(ctx) |
| if err != nil { |
| t.logger.Warningf("xds: ADS stream creation failed: %v", err) |
| continue |
| } |
| t.logger.Infof("ADS stream created") |
| |
| select { |
| case <-t.streamCh: |
| default: |
| } |
| t.streamCh <- stream |
| if t.recv(stream) { |
| retries = 0 |
| } |
| } |
| } |
| |
| // send is a separate goroutine for sending watch requests on the xds stream. |
| // |
| // It watches the stream channel for new streams, and the request channel for |
| // new requests to send on the stream. |
| // |
| // For each new request (watchAction), it's |
| // - processed and added to the watch map |
| // - so resend will pick them up when there are new streams |
| // - sent on the current stream if there's one |
| // - the current stream is cleared when any send on it fails |
| // |
| // For each new stream, all the existing requests will be resent. |
| // |
| // Note that this goroutine doesn't do anything to the old stream when there's a |
| // new one. In fact, there should be only one stream in progress, and new one |
| // should only be created when the old one fails (recv returns an error). |
| func (t *TransportHelper) send(ctx context.Context) { |
| var stream grpc.ClientStream |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case stream = <-t.streamCh: |
| if !t.sendExisting(stream) { |
| // send failed, clear the current stream. |
| stream = nil |
| } |
| case u := <-t.sendCh.Get(): |
| t.sendCh.Load() |
| |
| var ( |
| target []string |
| rType ResourceType |
| version, nonce, errMsg string |
| send bool |
| ) |
| switch update := u.(type) { |
| case *watchAction: |
| target, rType, version, nonce = t.processWatchInfo(update) |
| case *ackAction: |
| target, rType, version, nonce, send = t.processAckInfo(update, stream) |
| if !send { |
| continue |
| } |
| errMsg = update.errMsg |
| } |
| if stream == nil { |
| // There's no stream yet. Skip the request. This request |
| // will be resent to the new streams. If no stream is |
| // created, the watcher will timeout (same as server not |
| // sending response back). |
| continue |
| } |
| if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { |
| t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) |
| // send failed, clear the current stream. |
| stream = nil |
| } |
| } |
| } |
| } |
| |
| // sendExisting sends out xDS requests for registered watchers when recovering |
| // from a broken stream. |
| // |
| // We call stream.Send() here with the lock being held. It should be OK to do |
| // that here because the stream has just started and Send() usually returns |
| // quickly (once it pushes the message onto the transport layer) and is only |
| // ever blocked if we don't have enough flow control quota. |
| func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| |
| // Reset the ack versions when the stream restarts. |
| t.versionMap = make(map[ResourceType]string) |
| t.nonceMap = make(map[ResourceType]string) |
| |
| for rType, s := range t.watchMap { |
| if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { |
| t.logger.Errorf("ADS request failed: %v", err) |
| return false |
| } |
| } |
| |
| return true |
| } |
| |
| // recv receives xDS responses on the provided ADS stream and branches out to |
| // message specific handlers. |
| func (t *TransportHelper) recv(stream grpc.ClientStream) bool { |
| success := false |
| for { |
| resp, err := t.vClient.RecvResponse(stream) |
| if err != nil { |
| t.logger.Warningf("ADS stream is closed with error: %v", err) |
| return success |
| } |
| rType, version, nonce, err := t.vClient.HandleResponse(resp) |
| if e, ok := err.(ErrResourceTypeUnsupported); ok { |
| t.logger.Warningf("%s", e.ErrStr) |
| continue |
| } |
| if err != nil { |
| t.sendCh.Put(&ackAction{ |
| rType: rType, |
| version: "", |
| nonce: nonce, |
| errMsg: err.Error(), |
| stream: stream, |
| }) |
| t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) |
| continue |
| } |
| t.sendCh.Put(&ackAction{ |
| rType: rType, |
| version: version, |
| nonce: nonce, |
| stream: stream, |
| }) |
| t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce) |
| success = true |
| } |
| } |
| |
| func mapToSlice(m map[string]bool) (ret []string) { |
| for i := range m { |
| ret = append(ret, i) |
| } |
| return |
| } |
| |
| type watchAction struct { |
| rType ResourceType |
| remove bool // Whether this is to remove watch for the resource. |
| resource string |
| } |
| |
| // processWatchInfo pulls the fields needed by the request from a watchAction. |
| // |
| // It also updates the watch map. |
| func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| |
| var current map[string]bool |
| current, ok := t.watchMap[w.rType] |
| if !ok { |
| current = make(map[string]bool) |
| t.watchMap[w.rType] = current |
| } |
| |
| if w.remove { |
| delete(current, w.resource) |
| if len(current) == 0 { |
| delete(t.watchMap, w.rType) |
| } |
| } else { |
| current[w.resource] = true |
| } |
| |
| rType = w.rType |
| target = mapToSlice(current) |
| // We don't reset version or nonce when a new watch is started. The version |
| // and nonce from previous response are carried by the request unless the |
| // stream is recreated. |
| ver = t.versionMap[rType] |
| nonce = t.nonceMap[rType] |
| return target, rType, ver, nonce |
| } |
| |
| type ackAction struct { |
| rType ResourceType |
| version string // NACK if version is an empty string. |
| nonce string |
| errMsg string // Empty unless it's a NACK. |
| // ACK/NACK are tagged with the stream it's for. When the stream is down, |
| // all the ACK/NACK for this stream will be dropped, and the version/nonce |
| // won't be updated. |
| stream grpc.ClientStream |
| } |
| |
| // processAckInfo pulls the fields needed by the ack request from a ackAction. |
| // |
| // If no active watch is found for this ack, it returns false for send. |
| func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { |
| if ack.stream != stream { |
| // If ACK's stream isn't the current sending stream, this means the ACK |
| // was pushed to queue before the old stream broke, and a new stream has |
| // been started since. Return immediately here so we don't update the |
| // nonce for the new stream. |
| return nil, UnknownResource, "", "", false |
| } |
| rType = ack.rType |
| |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| |
| // Update the nonce no matter if we are going to send the ACK request on |
| // wire. We may not send the request if the watch is canceled. But the nonce |
| // needs to be updated so the next request will have the right nonce. |
| nonce = ack.nonce |
| t.nonceMap[rType] = nonce |
| |
| s, ok := t.watchMap[rType] |
| if !ok || len(s) == 0 { |
| // We don't send the request ack if there's no active watch (this can be |
| // either the server sends responses before any request, or the watch is |
| // canceled while the ackAction is in queue), because there's no resource |
| // name. And if we send a request with empty resource name list, the |
| // server may treat it as a wild card and send us everything. |
| return nil, UnknownResource, "", "", false |
| } |
| send = true |
| target = mapToSlice(s) |
| |
| version = ack.version |
| if version == "" { |
| // This is a nack, get the previous acked version. |
| version = t.versionMap[rType] |
| // version will still be an empty string if rType isn't |
| // found in versionMap, this can happen if there wasn't any ack |
| // before. |
| } else { |
| t.versionMap[rType] = version |
| } |
| return target, rType, version, nonce, send |
| } |
| |
| // reportLoad starts an LRS stream to report load data to the management server. |
| // It blocks until the context is cancelled. |
| func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) { |
| retries := 0 |
| for { |
| if ctx.Err() != nil { |
| return |
| } |
| |
| if retries != 0 { |
| timer := time.NewTimer(t.backoff(retries)) |
| select { |
| case <-timer.C: |
| case <-ctx.Done(): |
| if !timer.Stop() { |
| <-timer.C |
| } |
| return |
| } |
| } |
| |
| retries++ |
| stream, err := t.vClient.NewLoadStatsStream(ctx, cc) |
| if err != nil { |
| logger.Warningf("lrs: failed to create stream: %v", err) |
| continue |
| } |
| logger.Infof("lrs: created LRS stream") |
| |
| if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { |
| logger.Warningf("lrs: failed to send first request: %v", err) |
| continue |
| } |
| |
| clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) |
| if err != nil { |
| logger.Warning(err) |
| continue |
| } |
| |
| retries = 0 |
| t.sendLoads(ctx, stream, opts.loadStore, clusters, interval) |
| } |
| } |
| |
| func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) { |
| tick := time.NewTicker(interval) |
| defer tick.Stop() |
| for { |
| select { |
| case <-tick.C: |
| case <-ctx.Done(): |
| return |
| } |
| if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil { |
| logger.Warning(err) |
| return |
| } |
| } |
| } |