blob: 2378a86fff107dd90857f972ced08c7a203a1811 [file] [log] [blame]
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"context"
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/testutils/fakeserver"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/testdata"
)
const defaultTestTimeout = 1 * time.Second
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
type listenerWrapper struct {
net.Listener
connCh *testutils.Channel
}
// Accept waits for and returns the next connection to the listener.
func (l *listenerWrapper) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
l.connCh.Send(c)
return c, nil
}
func setupwithListener(t *testing.T, opts ...grpc.ServerOption) (*fakeserver.Server, *listenerWrapper, func()) {
t.Helper()
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen(tcp, localhost:0): %v", err)
}
lw := &listenerWrapper{
Listener: l,
connCh: testutils.NewChannel(),
}
server, cleanup, err := fakeserver.Start(lw, opts...)
if err != nil {
t.Fatalf("fakeserver.Start(): %v", err)
}
t.Logf("Fake RLS server started at %s ...", server.Address)
return server, lw, cleanup
}
type testBalancerCC struct {
balancer.ClientConn
}
// TestUpdateControlChannelFirstConfig tests the scenario where the LB policy
// receives its first service config and verifies that a control channel to the
// RLS server specified in the serviceConfig is established.
func (s) TestUpdateControlChannelFirstConfig(t *testing.T) {
server, lis, cleanup := setupwithListener(t)
defer cleanup()
bb := balancer.Get(rlsBalancerName)
if bb == nil {
t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName)
}
rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{})
defer rlsB.Close()
t.Log("Built RLS LB policy ...")
lbCfg := &lbConfig{lookupService: server.Address}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := lis.connCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
// TODO: Verify channel connectivity state once control channel connectivity
// state monitoring is in place.
// TODO: Verify RLS RPC can be made once we integrate with the picker.
}
// TestUpdateControlChannelSwitch tests the scenario where a control channel
// exists and the LB policy receives a new serviceConfig with a different RLS
// server name. Verifies that the new control channel is created and the old one
// is closed (the leakchecker takes care of this).
func (s) TestUpdateControlChannelSwitch(t *testing.T) {
server1, lis1, cleanup1 := setupwithListener(t)
defer cleanup1()
server2, lis2, cleanup2 := setupwithListener(t)
defer cleanup2()
bb := balancer.Get(rlsBalancerName)
if bb == nil {
t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName)
}
rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{})
defer rlsB.Close()
t.Log("Built RLS LB policy ...")
lbCfg := &lbConfig{lookupService: server1.Address}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := lis1.connCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
lbCfg = &lbConfig{lookupService: server2.Address}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
if _, err := lis2.connCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
// TODO: Verify channel connectivity state once control channel connectivity
// state monitoring is in place.
// TODO: Verify RLS RPC can be made once we integrate with the picker.
}
// TestUpdateControlChannelTimeout tests the scenario where the LB policy
// receives a service config update with a different lookupServiceTimeout, but
// the lookupService itself remains unchanged. It verifies that the LB policy
// does not create a new control channel in this case.
func (s) TestUpdateControlChannelTimeout(t *testing.T) {
server, lis, cleanup := setupwithListener(t)
defer cleanup()
bb := balancer.Get(rlsBalancerName)
if bb == nil {
t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName)
}
rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{})
defer rlsB.Close()
t.Log("Built RLS LB policy ...")
lbCfg := &lbConfig{lookupService: server.Address, lookupServiceTimeout: 1 * time.Second}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := lis.connCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
lbCfg = &lbConfig{lookupService: server.Address, lookupServiceTimeout: 2 * time.Second}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
if _, err := lis.connCh.Receive(ctx); err != context.DeadlineExceeded {
t.Fatal("LB policy created new control channel when only lookupServiceTimeout changed")
}
// TODO: Verify channel connectivity state once control channel connectivity
// state monitoring is in place.
// TODO: Verify RLS RPC can be made once we integrate with the picker.
}
// TestUpdateControlChannelWithCreds tests the scenario where the control
// channel is to established with credentials from the parent channel.
func (s) TestUpdateControlChannelWithCreds(t *testing.T) {
sCreds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
if err != nil {
t.Fatalf("credentials.NewServerTLSFromFile(server1.pem, server1.key) = %v", err)
}
cCreds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "")
if err != nil {
t.Fatalf("credentials.NewClientTLSFromFile(ca.pem) = %v", err)
}
server, lis, cleanup := setupwithListener(t, grpc.Creds(sCreds))
defer cleanup()
bb := balancer.Get(rlsBalancerName)
if bb == nil {
t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName)
}
rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{
DialCreds: cCreds,
})
defer rlsB.Close()
t.Log("Built RLS LB policy ...")
lbCfg := &lbConfig{lookupService: server.Address}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := lis.connCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
// TODO: Verify channel connectivity state once control channel connectivity
// state monitoring is in place.
// TODO: Verify RLS RPC can be made once we integrate with the picker.
}