| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package client_test |
| |
| import ( |
| "context" |
| "testing" |
| "time" |
| |
| v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" |
| endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" |
| lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" |
| durationpb "github.com/golang/protobuf/ptypes/duration" |
| "github.com/google/go-cmp/cmp" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/status" |
| "google.golang.org/grpc/xds/internal/client" |
| "google.golang.org/grpc/xds/internal/client/bootstrap" |
| "google.golang.org/grpc/xds/internal/testutils/fakeserver" |
| "google.golang.org/grpc/xds/internal/version" |
| "google.golang.org/protobuf/testing/protocmp" |
| |
| _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. |
| ) |
| |
| const ( |
| defaultTestTimeout = 5 * time.Second |
| defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. |
| |
| defaultClientWatchExpiryTimeout = 15 * time.Second |
| ) |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| func (s) TestLRSClient(t *testing.T) { |
| fs, sCleanup, err := fakeserver.StartServer() |
| if err != nil { |
| t.Fatalf("failed to start fake xDS server: %v", err) |
| } |
| defer sCleanup() |
| |
| xdsC, err := client.NewWithConfigForTesting(&bootstrap.Config{ |
| BalancerName: fs.Address, |
| Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), |
| NodeProto: &v2corepb.Node{}, |
| TransportAPI: version.TransportV2, |
| }, defaultClientWatchExpiryTimeout) |
| if err != nil { |
| t.Fatalf("failed to create xds client: %v", err) |
| } |
| defer xdsC.Close() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if u, err := fs.NewConnChan.Receive(ctx); err != nil { |
| t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) |
| } |
| |
| // Report to the same address should not create new ClientConn. |
| store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) |
| defer lrsCancel1() |
| sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) |
| defer sCancel() |
| if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded { |
| t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) |
| } |
| |
| fs2, sCleanup2, err := fakeserver.StartServer() |
| if err != nil { |
| t.Fatalf("failed to start fake xDS server: %v", err) |
| } |
| defer sCleanup2() |
| |
| // Report to a different address should create new ClientConn. |
| store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) |
| defer lrsCancel2() |
| if u, err := fs2.NewConnChan.Receive(ctx); err != nil { |
| t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) |
| } |
| |
| if store1 == store2 { |
| t.Fatalf("got same store for different servers, want different") |
| } |
| |
| if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil { |
| t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) |
| } |
| store2.PerCluster("cluster", "eds").CallDropped("test") |
| |
| // Send one resp to the client. |
| fs2.LRSResponseChan <- &fakeserver.Response{ |
| Resp: &lrspb.LoadStatsResponse{ |
| SendAllClusters: true, |
| LoadReportingInterval: &durationpb.Duration{Nanos: 50000000}, |
| }, |
| } |
| |
| // Server should receive a req with the loads. |
| u, err := fs2.LRSRequestChan.Receive(ctx) |
| if err != nil { |
| t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err) |
| } |
| receivedLoad := u.(*fakeserver.Request).Req.(*lrspb.LoadStatsRequest).ClusterStats |
| if len(receivedLoad) <= 0 { |
| t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") |
| } |
| receivedLoad[0].LoadReportInterval = nil |
| want := &endpointpb.ClusterStats{ |
| ClusterName: "cluster", |
| ClusterServiceName: "eds", |
| TotalDroppedRequests: 1, |
| DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, |
| } |
| if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" { |
| t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d) |
| } |
| |
| // Cancel this load reporting stream, server should see error canceled. |
| lrsCancel2() |
| |
| // Server should receive a stream canceled error. |
| if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled { |
| t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err) |
| } |
| } |