| /* |
| * |
| * Copyright 2017 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package roundrobin_test |
| |
| import ( |
| "fmt" |
| "net" |
| "sync" |
| "testing" |
| "time" |
| |
| "golang.org/x/net/context" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/balancer/roundrobin" |
| "google.golang.org/grpc/codes" |
| _ "google.golang.org/grpc/grpclog/glogger" |
| "google.golang.org/grpc/peer" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| "google.golang.org/grpc/status" |
| testpb "google.golang.org/grpc/test/grpc_testing" |
| "google.golang.org/grpc/test/leakcheck" |
| ) |
| |
| type testServer struct { |
| testpb.TestServiceServer |
| } |
| |
| func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { |
| return &testpb.Empty{}, nil |
| } |
| |
| func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { |
| return nil |
| } |
| |
| type test struct { |
| servers []*grpc.Server |
| addresses []string |
| } |
| |
| func (t *test) cleanup() { |
| for _, s := range t.servers { |
| s.Stop() |
| } |
| } |
| |
| func startTestServers(count int) (_ *test, err error) { |
| t := &test{} |
| |
| defer func() { |
| if err != nil { |
| for _, s := range t.servers { |
| s.Stop() |
| } |
| } |
| }() |
| for i := 0; i < count; i++ { |
| lis, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| return nil, fmt.Errorf("Failed to listen %v", err) |
| } |
| |
| s := grpc.NewServer() |
| testpb.RegisterTestServiceServer(s, &testServer{}) |
| t.servers = append(t.servers, s) |
| t.addresses = append(t.addresses, lis.Addr().String()) |
| |
| go func(s *grpc.Server, l net.Listener) { |
| s.Serve(l) |
| }(s, lis) |
| } |
| |
| return t, nil |
| } |
| |
| func TestOneBackend(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| test, err := startTestServers(1) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
| // The second RPC should succeed. |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| } |
| |
| func TestBackendsRoundRobin(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| backendCount := 5 |
| test, err := startTestServers(backendCount) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| var resolvedAddrs []resolver.Address |
| for i := 0; i < backendCount; i++ { |
| resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
| } |
| |
| r.NewAddress(resolvedAddrs) |
| var p peer.Peer |
| // Make sure connections to all servers are up. |
| for si := 0; si < backendCount; si++ { |
| var connected bool |
| for i := 0; i < 1000; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() == test.addresses[si] { |
| connected = true |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| if !connected { |
| t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
| } |
| } |
| |
| for i := 0; i < 3*backendCount; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() != test.addresses[i%backendCount] { |
| t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
| } |
| } |
| } |
| |
| func TestAddressesRemoved(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| test, err := startTestServers(1) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
| // The second RPC should succeed. |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| |
| r.NewAddress([]resolver.Address{}) |
| for i := 0; i < 1000; i++ { |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded { |
| return |
| } |
| time.Sleep(time.Millisecond) |
| } |
| t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded") |
| } |
| |
| func TestCloseWithPendingRPC(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| test, err := startTestServers(1) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| testc := testpb.NewTestServiceClient(cc) |
| |
| var wg sync.WaitGroup |
| for i := 0; i < 3; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| // This RPC blocks until cc is closed. |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { |
| t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") |
| } |
| cancel() |
| }() |
| } |
| cc.Close() |
| wg.Wait() |
| } |
| |
| func TestNewAddressWhileBlocking(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| test, err := startTestServers(1) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
| // The second RPC should succeed. |
| ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, nil", err) |
| } |
| |
| r.NewAddress([]resolver.Address{}) |
| |
| var wg sync.WaitGroup |
| for i := 0; i < 3; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| // This RPC blocks until NewAddress is called. |
| testc.EmptyCall(context.Background(), &testpb.Empty{}) |
| }() |
| } |
| time.Sleep(50 * time.Millisecond) |
| r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) |
| wg.Wait() |
| } |
| |
| func TestOneServerDown(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| backendCount := 3 |
| test, err := startTestServers(backendCount) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| var resolvedAddrs []resolver.Address |
| for i := 0; i < backendCount; i++ { |
| resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
| } |
| |
| r.NewAddress(resolvedAddrs) |
| var p peer.Peer |
| // Make sure connections to all servers are up. |
| for si := 0; si < backendCount; si++ { |
| var connected bool |
| for i := 0; i < 1000; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() == test.addresses[si] { |
| connected = true |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| if !connected { |
| t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
| } |
| } |
| |
| for i := 0; i < 3*backendCount; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() != test.addresses[i%backendCount] { |
| t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
| } |
| } |
| |
| // Stop one server, RPCs should roundrobin among the remaining servers. |
| backendCount-- |
| test.servers[backendCount].Stop() |
| // Loop until see server[backendCount-1] twice without seeing server[backendCount]. |
| var targetSeen int |
| for i := 0; i < 1000; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| targetSeen = 0 |
| t.Logf("EmptyCall() = _, %v, want _, <nil>", err) |
| // Due to a race, this RPC could possibly get the connection that |
| // was closing, and this RPC may fail. Keep trying when this |
| // happens. |
| continue |
| } |
| switch p.Addr.String() { |
| case test.addresses[backendCount-1]: |
| targetSeen++ |
| case test.addresses[backendCount]: |
| // Reset targetSeen if peer is server[backendCount]. |
| targetSeen = 0 |
| } |
| // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky. |
| if targetSeen >= 2 { |
| break |
| } |
| } |
| if targetSeen != 2 { |
| t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") |
| } |
| for i := 0; i < 3*backendCount; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() != test.addresses[i%backendCount] { |
| t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
| } |
| } |
| } |
| |
| func TestAllServersDown(t *testing.T) { |
| defer leakcheck.Check(t) |
| r, cleanup := manual.GenerateAndRegisterManualResolver() |
| defer cleanup() |
| |
| backendCount := 3 |
| test, err := startTestServers(backendCount) |
| if err != nil { |
| t.Fatalf("failed to start servers: %v", err) |
| } |
| defer test.cleanup() |
| |
| cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) |
| if err != nil { |
| t.Fatalf("failed to dial: %v", err) |
| } |
| defer cc.Close() |
| testc := testpb.NewTestServiceClient(cc) |
| // The first RPC should fail because there's no address. |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) |
| defer cancel() |
| if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) |
| } |
| |
| var resolvedAddrs []resolver.Address |
| for i := 0; i < backendCount; i++ { |
| resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) |
| } |
| |
| r.NewAddress(resolvedAddrs) |
| var p peer.Peer |
| // Make sure connections to all servers are up. |
| for si := 0; si < backendCount; si++ { |
| var connected bool |
| for i := 0; i < 1000; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() == test.addresses[si] { |
| connected = true |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| if !connected { |
| t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) |
| } |
| } |
| |
| for i := 0; i < 3*backendCount; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { |
| t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) |
| } |
| if p.Addr.String() != test.addresses[i%backendCount] { |
| t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) |
| } |
| } |
| |
| // All servers are stopped, failfast RPC should fail with unavailable. |
| for i := 0; i < backendCount; i++ { |
| test.servers[i].Stop() |
| } |
| time.Sleep(100 * time.Millisecond) |
| for i := 0; i < 1000; i++ { |
| if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { |
| return |
| } |
| time.Sleep(time.Millisecond) |
| } |
| t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") |
| } |