blob: 580972f63d4df978d0cf395c06b582bdf0936580 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"context"
"errors"
"io"
"net"
"testing"
"time"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
durationpb "github.com/golang/protobuf/ptypes/duration"
structpb "github.com/golang/protobuf/ptypes/struct"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/xds/internal"
cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
addresspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/address"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint"
adsgrpc "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
lrsgrpc "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
var (
testServiceName = "test/foo"
testCDSReq = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
},
},
},
TypeUrl: cdsType,
}
testEDSReq = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: true},
},
},
},
},
TypeUrl: edsType,
}
testEDSReqWithoutEndpoints = &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
},
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: false},
},
},
},
},
TypeUrl: edsType,
}
testCluster = &cdspb.Cluster{
Name: testServiceName,
ClusterDiscoveryType: &cdspb.Cluster_Type{Type: cdspb.Cluster_EDS},
LbPolicy: cdspb.Cluster_ROUND_ROBIN,
}
marshaledCluster, _ = proto.Marshal(testCluster)
testCDSResp = &discoverypb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: cdsType,
Value: marshaledCluster,
},
},
TypeUrl: cdsType,
}
testClusterLoadAssignment = &edspb.ClusterLoadAssignment{
ClusterName: testServiceName,
Endpoints: []*endpointpb.LocalityLbEndpoints{
{
Locality: &basepb.Locality{
Region: "asia-east1",
Zone: "1",
SubZone: "sa",
},
LbEndpoints: []*endpointpb.LbEndpoint{
{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &addresspb.Address{
Address: &addresspb.Address_SocketAddress{
SocketAddress: &addresspb.SocketAddress{
Address: "1.1.1.1",
PortSpecifier: &addresspb.SocketAddress_PortValue{
PortValue: 10001,
},
ResolverName: "dns",
},
},
},
HealthCheckConfig: nil,
},
},
Metadata: &basepb.Metadata{
FilterMetadata: map[string]*structpb.Struct{
"xx.lb": {
Fields: map[string]*structpb.Value{
"endpoint_name": {
Kind: &structpb.Value_StringValue{
StringValue: "some.endpoint.name",
},
},
},
},
},
},
},
},
LoadBalancingWeight: &wrpb.UInt32Value{
Value: 1,
},
Priority: 0,
},
},
}
marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
testEDSResp = &discoverypb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: edsType,
Value: marshaledClusterLoadAssignment,
},
},
TypeUrl: edsType,
}
testClusterLoadAssignmentWithoutEndpoints = &edspb.ClusterLoadAssignment{
ClusterName: testServiceName,
Endpoints: []*endpointpb.LocalityLbEndpoints{
{
Locality: &basepb.Locality{
SubZone: "sa",
},
LoadBalancingWeight: &wrpb.UInt32Value{
Value: 128,
},
Priority: 0,
},
},
Policy: nil,
}
marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints)
testEDSRespWithoutEndpoints = &discoverypb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: edsType,
Value: marshaledClusterLoadAssignmentWithoutEndpoints,
},
},
TypeUrl: edsType,
}
)
type testTrafficDirector struct {
reqChan chan *request
respChan chan *response
}
type request struct {
req *discoverypb.DiscoveryRequest
err error
}
type response struct {
resp *discoverypb.DiscoveryResponse
err error
}
func (ttd *testTrafficDirector) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
for {
req, err := s.Recv()
if err != nil {
ttd.reqChan <- &request{
req: nil,
err: err,
}
if err == io.EOF {
return nil
}
return err
}
ttd.reqChan <- &request{
req: req,
err: nil,
}
if req.TypeUrl == edsType {
break
}
}
for {
select {
case resp := <-ttd.respChan:
if resp.err != nil {
return resp.err
}
if err := s.Send(resp.resp); err != nil {
return err
}
case <-s.Context().Done():
return s.Context().Err()
}
}
}
func (ttd *testTrafficDirector) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}
func (ttd *testTrafficDirector) sendResp(resp *response) {
ttd.respChan <- resp
}
func (ttd *testTrafficDirector) getReq() *request {
return <-ttd.reqChan
}
func newTestTrafficDirector() *testTrafficDirector {
return &testTrafficDirector{
reqChan: make(chan *request, 10),
respChan: make(chan *response, 10),
}
}
type testConfig struct {
doCDS bool
expectedRequests []*discoverypb.DiscoveryRequest
responsesToSend []*discoverypb.DiscoveryResponse
expectedADSResponses []proto.Message
adsErr error
svrErr error
}
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("listen failed due to: %v", err)
}
svr := grpc.NewServer()
td = newTestTrafficDirector()
lrss = &lrsServer{
drops: make(map[string]uint64),
reportingInterval: &durationpb.Duration{
Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration.
Nanos: 0,
},
}
adsgrpc.RegisterAggregatedDiscoveryServiceServer(svr, td)
lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss)
go svr.Serve(lis)
return lis.Addr().String(), td, lrss, func() {
svr.Stop()
lis.Close()
}
}
func (s) TestXdsClientResponseHandling(t *testing.T) {
for _, test := range []*testConfig{
{
doCDS: true,
expectedRequests: []*discoverypb.DiscoveryRequest{testCDSReq, testEDSReq},
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp, testEDSResp},
expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment},
},
{
doCDS: false,
expectedRequests: []*discoverypb.DiscoveryRequest{testEDSReqWithoutEndpoints},
responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints},
},
} {
testXdsClientResponseHandling(t, test)
}
}
func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan <- i
return nil
}
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {})
defer client.close()
go client.run()
for _, expectedReq := range test.expectedRequests {
req := td.getReq()
if req.err != nil {
t.Fatalf("ads RPC failed with err: %v", req.err)
}
if !proto.Equal(req.req, expectedReq) {
t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
}
}
for i, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
ads := <-adsChan
if !proto.Equal(ads, test.expectedADSResponses[i]) {
t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
}
}
}
func (s) TestXdsClientLoseContact(t *testing.T) {
for _, test := range []*testConfig{
{
doCDS: true,
responsesToSend: []*discoverypb.DiscoveryResponse{},
},
{
doCDS: false,
responsesToSend: []*discoverypb.DiscoveryResponse{testEDSRespWithoutEndpoints},
},
} {
testXdsClientLoseContactRemoteClose(t, test)
}
for _, test := range []*testConfig{
{
doCDS: false,
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode.
},
{
doCDS: true,
responsesToSend: []*discoverypb.DiscoveryResponse{{}}, // response with 0 resources is an error case.
},
{
doCDS: true,
responsesToSend: []*discoverypb.DiscoveryResponse{testCDSResp},
adsErr: errors.New("some ads parsing error from xdsBalancer"),
},
} {
testXdsClientLoseContactADSRelatedErrorOccur(t, test)
}
}
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan <- i
return nil
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
for _, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
// make sure client side receives it
<-adsChan
}
cleanup()
select {
case <-contactChan:
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan <- i
return test.adsErr
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, test.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
for _, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
}
select {
case <-contactChan:
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
func (s) TestXdsClientExponentialRetry(t *testing.T) {
cfg := &testConfig{
svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
}
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan <- i
return nil
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, cfg.doCDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
var secondRetry, thirdRetry time.Time
for i := 0; i < 3; i++ {
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
td.sendResp(&response{err: cfg.svrErr})
select {
case <-contactChan:
if i == 1 {
secondRetry = time.Now()
}
if i == 2 {
thirdRetry = time.Now()
}
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
if thirdRetry.Sub(secondRetry) < 1*time.Second {
t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
}
}