| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "context" |
| "net" |
| "testing" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/testutils/fakeserver" |
| "google.golang.org/grpc/credentials" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/testdata" |
| ) |
| |
| const defaultTestTimeout = 1 * time.Second |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| type listenerWrapper struct { |
| net.Listener |
| connCh *testutils.Channel |
| } |
| |
| // Accept waits for and returns the next connection to the listener. |
| func (l *listenerWrapper) Accept() (net.Conn, error) { |
| c, err := l.Listener.Accept() |
| if err != nil { |
| return nil, err |
| } |
| l.connCh.Send(c) |
| return c, nil |
| } |
| |
| func setupwithListener(t *testing.T, opts ...grpc.ServerOption) (*fakeserver.Server, *listenerWrapper, func()) { |
| t.Helper() |
| |
| l, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("net.Listen(tcp, localhost:0): %v", err) |
| } |
| lw := &listenerWrapper{ |
| Listener: l, |
| connCh: testutils.NewChannel(), |
| } |
| |
| server, cleanup, err := fakeserver.Start(lw, opts...) |
| if err != nil { |
| t.Fatalf("fakeserver.Start(): %v", err) |
| } |
| t.Logf("Fake RLS server started at %s ...", server.Address) |
| |
| return server, lw, cleanup |
| } |
| |
| type testBalancerCC struct { |
| balancer.ClientConn |
| } |
| |
| // TestUpdateControlChannelFirstConfig tests the scenario where the LB policy |
| // receives its first service config and verifies that a control channel to the |
| // RLS server specified in the serviceConfig is established. |
| func (s) TestUpdateControlChannelFirstConfig(t *testing.T) { |
| server, lis, cleanup := setupwithListener(t) |
| defer cleanup() |
| |
| bb := balancer.Get(rlsBalancerName) |
| if bb == nil { |
| t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) |
| } |
| rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) |
| defer rlsB.Close() |
| t.Log("Built RLS LB policy ...") |
| |
| lbCfg := &lbConfig{lookupService: server.Address} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := lis.connCh.Receive(ctx); err != nil { |
| t.Fatal("Timeout expired when waiting for LB policy to create control channel") |
| } |
| |
| // TODO: Verify channel connectivity state once control channel connectivity |
| // state monitoring is in place. |
| |
| // TODO: Verify RLS RPC can be made once we integrate with the picker. |
| } |
| |
| // TestUpdateControlChannelSwitch tests the scenario where a control channel |
| // exists and the LB policy receives a new serviceConfig with a different RLS |
| // server name. Verifies that the new control channel is created and the old one |
| // is closed (the leakchecker takes care of this). |
| func (s) TestUpdateControlChannelSwitch(t *testing.T) { |
| server1, lis1, cleanup1 := setupwithListener(t) |
| defer cleanup1() |
| |
| server2, lis2, cleanup2 := setupwithListener(t) |
| defer cleanup2() |
| |
| bb := balancer.Get(rlsBalancerName) |
| if bb == nil { |
| t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) |
| } |
| rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) |
| defer rlsB.Close() |
| t.Log("Built RLS LB policy ...") |
| |
| lbCfg := &lbConfig{lookupService: server1.Address} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := lis1.connCh.Receive(ctx); err != nil { |
| t.Fatal("Timeout expired when waiting for LB policy to create control channel") |
| } |
| |
| lbCfg = &lbConfig{lookupService: server2.Address} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| |
| if _, err := lis2.connCh.Receive(ctx); err != nil { |
| t.Fatal("Timeout expired when waiting for LB policy to create control channel") |
| } |
| |
| // TODO: Verify channel connectivity state once control channel connectivity |
| // state monitoring is in place. |
| |
| // TODO: Verify RLS RPC can be made once we integrate with the picker. |
| } |
| |
| // TestUpdateControlChannelTimeout tests the scenario where the LB policy |
| // receives a service config update with a different lookupServiceTimeout, but |
| // the lookupService itself remains unchanged. It verifies that the LB policy |
| // does not create a new control channel in this case. |
| func (s) TestUpdateControlChannelTimeout(t *testing.T) { |
| server, lis, cleanup := setupwithListener(t) |
| defer cleanup() |
| |
| bb := balancer.Get(rlsBalancerName) |
| if bb == nil { |
| t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) |
| } |
| rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) |
| defer rlsB.Close() |
| t.Log("Built RLS LB policy ...") |
| |
| lbCfg := &lbConfig{lookupService: server.Address, lookupServiceTimeout: 1 * time.Second} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := lis.connCh.Receive(ctx); err != nil { |
| t.Fatal("Timeout expired when waiting for LB policy to create control channel") |
| } |
| |
| lbCfg = &lbConfig{lookupService: server.Address, lookupServiceTimeout: 2 * time.Second} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| if _, err := lis.connCh.Receive(ctx); err != context.DeadlineExceeded { |
| t.Fatal("LB policy created new control channel when only lookupServiceTimeout changed") |
| } |
| |
| // TODO: Verify channel connectivity state once control channel connectivity |
| // state monitoring is in place. |
| |
| // TODO: Verify RLS RPC can be made once we integrate with the picker. |
| } |
| |
| // TestUpdateControlChannelWithCreds tests the scenario where the control |
| // channel is to established with credentials from the parent channel. |
| func (s) TestUpdateControlChannelWithCreds(t *testing.T) { |
| sCreds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) |
| if err != nil { |
| t.Fatalf("credentials.NewServerTLSFromFile(server1.pem, server1.key) = %v", err) |
| } |
| cCreds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "") |
| if err != nil { |
| t.Fatalf("credentials.NewClientTLSFromFile(ca.pem) = %v", err) |
| } |
| |
| server, lis, cleanup := setupwithListener(t, grpc.Creds(sCreds)) |
| defer cleanup() |
| |
| bb := balancer.Get(rlsBalancerName) |
| if bb == nil { |
| t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) |
| } |
| rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{ |
| DialCreds: cCreds, |
| }) |
| defer rlsB.Close() |
| t.Log("Built RLS LB policy ...") |
| |
| lbCfg := &lbConfig{lookupService: server.Address} |
| t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) |
| rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := lis.connCh.Receive(ctx); err != nil { |
| t.Fatal("Timeout expired when waiting for LB policy to create control channel") |
| } |
| |
| // TODO: Verify channel connectivity state once control channel connectivity |
| // state monitoring is in place. |
| |
| // TODO: Verify RLS RPC can be made once we integrate with the picker. |
| } |