| /* |
| * |
| * Copyright 2014 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package test |
| |
| import ( |
| "bufio" |
| "bytes" |
| "compress/gzip" |
| "context" |
| "crypto/tls" |
| "errors" |
| "flag" |
| "fmt" |
| "io" |
| "math" |
| "net" |
| "net/http" |
| "os" |
| "reflect" |
| "runtime" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "syscall" |
| "testing" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| anypb "github.com/golang/protobuf/ptypes/any" |
| "golang.org/x/net/http2" |
| "golang.org/x/net/http2/hpack" |
| spb "google.golang.org/genproto/googleapis/rpc/status" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials" |
| "google.golang.org/grpc/encoding" |
| _ "google.golang.org/grpc/encoding/gzip" |
| "google.golang.org/grpc/health" |
| healthgrpc "google.golang.org/grpc/health/grpc_health_v1" |
| healthpb "google.golang.org/grpc/health/grpc_health_v1" |
| "google.golang.org/grpc/internal/channelz" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/internal/transport" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/peer" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| "google.golang.org/grpc/serviceconfig" |
| "google.golang.org/grpc/stats" |
| "google.golang.org/grpc/status" |
| "google.golang.org/grpc/tap" |
| testpb "google.golang.org/grpc/test/grpc_testing" |
| "google.golang.org/grpc/testdata" |
| ) |
| |
| const defaultHealthService = "grpc.health.v1.Health" |
| |
| func init() { |
| channelz.TurnOn() |
| } |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| var ( |
| // For headers: |
| testMetadata = metadata.MD{ |
| "key1": []string{"value1"}, |
| "key2": []string{"value2"}, |
| "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})}, |
| } |
| testMetadata2 = metadata.MD{ |
| "key1": []string{"value12"}, |
| "key2": []string{"value22"}, |
| } |
| // For trailers: |
| testTrailerMetadata = metadata.MD{ |
| "tkey1": []string{"trailerValue1"}, |
| "tkey2": []string{"trailerValue2"}, |
| "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})}, |
| } |
| testTrailerMetadata2 = metadata.MD{ |
| "tkey1": []string{"trailerValue12"}, |
| "tkey2": []string{"trailerValue22"}, |
| } |
| // capital "Key" is illegal in HTTP/2. |
| malformedHTTP2Metadata = metadata.MD{ |
| "Key": []string{"foo"}, |
| } |
| testAppUA = "myApp1/1.0 myApp2/0.9" |
| failAppUA = "fail-this-RPC" |
| detailedError = status.ErrorProto(&spb.Status{ |
| Code: int32(codes.DataLoss), |
| Message: "error for testing: " + failAppUA, |
| Details: []*anypb.Any{{ |
| TypeUrl: "url", |
| Value: []byte{6, 0, 0, 6, 1, 3}, |
| }}, |
| }) |
| ) |
| |
| var raceMode bool // set by race.go in race mode |
| |
| type testServer struct { |
| testpb.UnimplementedTestServiceServer |
| |
| security string // indicate the authentication protocol used by this server. |
| earlyFail bool // whether to error out the execution of a service handler prematurely. |
| setAndSendHeader bool // whether to call setHeader and sendHeader. |
| setHeaderOnly bool // whether to only call setHeader, not sendHeader. |
| multipleSetTrailer bool // whether to call setTrailer multiple times. |
| unaryCallSleepTime time.Duration |
| } |
| |
| func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { |
| if md, ok := metadata.FromIncomingContext(ctx); ok { |
| // For testing purpose, returns an error if user-agent is failAppUA. |
| // To test that client gets the correct error. |
| if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { |
| return nil, detailedError |
| } |
| var str []string |
| for _, entry := range md["user-agent"] { |
| str = append(str, "ua", entry) |
| } |
| grpc.SendHeader(ctx, metadata.Pairs(str...)) |
| } |
| return new(testpb.Empty), nil |
| } |
| |
| func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) { |
| if size < 0 { |
| return nil, fmt.Errorf("requested a response with invalid length %d", size) |
| } |
| body := make([]byte, size) |
| switch t { |
| case testpb.PayloadType_COMPRESSABLE: |
| case testpb.PayloadType_UNCOMPRESSABLE: |
| return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported") |
| default: |
| return nil, fmt.Errorf("unsupported payload type: %d", t) |
| } |
| return &testpb.Payload{ |
| Type: t, |
| Body: body, |
| }, nil |
| } |
| |
| func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| md, ok := metadata.FromIncomingContext(ctx) |
| if ok { |
| if _, exists := md[":authority"]; !exists { |
| return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) |
| } |
| if s.setAndSendHeader { |
| if err := grpc.SetHeader(ctx, md); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) |
| } |
| if err := grpc.SendHeader(ctx, testMetadata2); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err) |
| } |
| } else if s.setHeaderOnly { |
| if err := grpc.SetHeader(ctx, md); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) |
| } |
| if err := grpc.SetHeader(ctx, testMetadata2); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err) |
| } |
| } else { |
| if err := grpc.SendHeader(ctx, md); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err) |
| } |
| } |
| if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err) |
| } |
| if s.multipleSetTrailer { |
| if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil { |
| return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err) |
| } |
| } |
| } |
| pr, ok := peer.FromContext(ctx) |
| if !ok { |
| return nil, status.Error(codes.DataLoss, "failed to get peer from ctx") |
| } |
| if pr.Addr == net.Addr(nil) { |
| return nil, status.Error(codes.DataLoss, "failed to get peer address") |
| } |
| if s.security != "" { |
| // Check Auth info |
| var authType, serverName string |
| switch info := pr.AuthInfo.(type) { |
| case credentials.TLSInfo: |
| authType = info.AuthType() |
| serverName = info.State.ServerName |
| default: |
| return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type") |
| } |
| if authType != s.security { |
| return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security) |
| } |
| if serverName != "x.test.example.com" { |
| return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName) |
| } |
| } |
| // Simulate some service delay. |
| time.Sleep(s.unaryCallSleepTime) |
| |
| payload, err := newPayload(in.GetResponseType(), in.GetResponseSize()) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &testpb.SimpleResponse{ |
| Payload: payload, |
| }, nil |
| } |
| |
| func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { |
| if md, ok := metadata.FromIncomingContext(stream.Context()); ok { |
| if _, exists := md[":authority"]; !exists { |
| return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) |
| } |
| // For testing purpose, returns an error if user-agent is failAppUA. |
| // To test that client gets the correct error. |
| if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { |
| return status.Error(codes.DataLoss, "error for testing: "+failAppUA) |
| } |
| } |
| cs := args.GetResponseParameters() |
| for _, c := range cs { |
| if us := c.GetIntervalUs(); us > 0 { |
| time.Sleep(time.Duration(us) * time.Microsecond) |
| } |
| |
| payload, err := newPayload(args.GetResponseType(), c.GetSize()) |
| if err != nil { |
| return err |
| } |
| |
| if err := stream.Send(&testpb.StreamingOutputCallResponse{ |
| Payload: payload, |
| }); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { |
| var sum int |
| for { |
| in, err := stream.Recv() |
| if err == io.EOF { |
| return stream.SendAndClose(&testpb.StreamingInputCallResponse{ |
| AggregatedPayloadSize: int32(sum), |
| }) |
| } |
| if err != nil { |
| return err |
| } |
| p := in.GetPayload().GetBody() |
| sum += len(p) |
| if s.earlyFail { |
| return status.Error(codes.NotFound, "not found") |
| } |
| } |
| } |
| |
| func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { |
| md, ok := metadata.FromIncomingContext(stream.Context()) |
| if ok { |
| if s.setAndSendHeader { |
| if err := stream.SetHeader(md); err != nil { |
| return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) |
| } |
| if err := stream.SendHeader(testMetadata2); err != nil { |
| return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) |
| } |
| } else if s.setHeaderOnly { |
| if err := stream.SetHeader(md); err != nil { |
| return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) |
| } |
| if err := stream.SetHeader(testMetadata2); err != nil { |
| return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) |
| } |
| } else { |
| if err := stream.SendHeader(md); err != nil { |
| return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) |
| } |
| } |
| stream.SetTrailer(testTrailerMetadata) |
| if s.multipleSetTrailer { |
| stream.SetTrailer(testTrailerMetadata2) |
| } |
| } |
| for { |
| in, err := stream.Recv() |
| if err == io.EOF { |
| // read done. |
| return nil |
| } |
| if err != nil { |
| // to facilitate testSvrWriteStatusEarlyWrite |
| if status.Code(err) == codes.ResourceExhausted { |
| return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) |
| } |
| return err |
| } |
| cs := in.GetResponseParameters() |
| for _, c := range cs { |
| if us := c.GetIntervalUs(); us > 0 { |
| time.Sleep(time.Duration(us) * time.Microsecond) |
| } |
| |
| payload, err := newPayload(in.GetResponseType(), c.GetSize()) |
| if err != nil { |
| return err |
| } |
| |
| if err := stream.Send(&testpb.StreamingOutputCallResponse{ |
| Payload: payload, |
| }); err != nil { |
| // to facilitate testSvrWriteStatusEarlyWrite |
| if status.Code(err) == codes.ResourceExhausted { |
| return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) |
| } |
| return err |
| } |
| } |
| } |
| } |
| |
| func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error { |
| var msgBuf []*testpb.StreamingOutputCallRequest |
| for { |
| in, err := stream.Recv() |
| if err == io.EOF { |
| // read done. |
| break |
| } |
| if err != nil { |
| return err |
| } |
| msgBuf = append(msgBuf, in) |
| } |
| for _, m := range msgBuf { |
| cs := m.GetResponseParameters() |
| for _, c := range cs { |
| if us := c.GetIntervalUs(); us > 0 { |
| time.Sleep(time.Duration(us) * time.Microsecond) |
| } |
| |
| payload, err := newPayload(m.GetResponseType(), c.GetSize()) |
| if err != nil { |
| return err |
| } |
| |
| if err := stream.Send(&testpb.StreamingOutputCallResponse{ |
| Payload: payload, |
| }); err != nil { |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| type env struct { |
| name string |
| network string // The type of network such as tcp, unix, etc. |
| security string // The security protocol such as TLS, SSH, etc. |
| httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS |
| balancer string // One of "round_robin", "pick_first", or "". |
| customDialer func(string, string, time.Duration) (net.Conn, error) |
| } |
| |
| func (e env) runnable() bool { |
| if runtime.GOOS == "windows" && e.network == "unix" { |
| return false |
| } |
| return true |
| } |
| |
| func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { |
| if e.customDialer != nil { |
| return e.customDialer(e.network, addr, timeout) |
| } |
| return net.DialTimeout(e.network, addr, timeout) |
| } |
| |
| var ( |
| tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp"} |
| tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"} |
| tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"} |
| tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"} |
| handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"} |
| noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"} |
| allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv} |
| ) |
| |
| var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.") |
| |
| func listTestEnv() (envs []env) { |
| if *onlyEnv != "" { |
| for _, e := range allEnv { |
| if e.name == *onlyEnv { |
| if !e.runnable() { |
| panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS)) |
| } |
| return []env{e} |
| } |
| } |
| panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv)) |
| } |
| for _, e := range allEnv { |
| if e.runnable() { |
| envs = append(envs, e) |
| } |
| } |
| return envs |
| } |
| |
| // test is an end-to-end test. It should be created with the newTest |
| // func, modified as needed, and then started with its startServer method. |
| // It should be cleaned up with the tearDown method. |
| type test struct { |
| // The following are setup in newTest(). |
| t *testing.T |
| e env |
| ctx context.Context // valid for life of test, before tearDown |
| cancel context.CancelFunc |
| |
| // The following knobs are for the server-side, and should be set after |
| // calling newTest() and before calling startServer(). |
| |
| // whether or not to expose the server's health via the default health |
| // service implementation. |
| enableHealthServer bool |
| // In almost all cases, one should set the 'enableHealthServer' flag above to |
| // expose the server's health using the default health service |
| // implementation. This should only be used when a non-default health service |
| // implementation is required. |
| healthServer healthpb.HealthServer |
| maxStream uint32 |
| tapHandle tap.ServerInHandle |
| maxServerMsgSize *int |
| maxServerReceiveMsgSize *int |
| maxServerSendMsgSize *int |
| maxServerHeaderListSize *uint32 |
| // Used to test the deprecated API WithCompressor and WithDecompressor. |
| serverCompression bool |
| unknownHandler grpc.StreamHandler |
| unaryServerInt grpc.UnaryServerInterceptor |
| streamServerInt grpc.StreamServerInterceptor |
| serverInitialWindowSize int32 |
| serverInitialConnWindowSize int32 |
| customServerOptions []grpc.ServerOption |
| |
| // The following knobs are for the client-side, and should be set after |
| // calling newTest() and before calling clientConn(). |
| maxClientMsgSize *int |
| maxClientReceiveMsgSize *int |
| maxClientSendMsgSize *int |
| maxClientHeaderListSize *uint32 |
| userAgent string |
| // Used to test the deprecated API WithCompressor and WithDecompressor. |
| clientCompression bool |
| // Used to test the new compressor registration API UseCompressor. |
| clientUseCompression bool |
| // clientNopCompression is set to create a compressor whose type is not supported. |
| clientNopCompression bool |
| unaryClientInt grpc.UnaryClientInterceptor |
| streamClientInt grpc.StreamClientInterceptor |
| sc <-chan grpc.ServiceConfig |
| customCodec encoding.Codec |
| clientInitialWindowSize int32 |
| clientInitialConnWindowSize int32 |
| perRPCCreds credentials.PerRPCCredentials |
| customDialOptions []grpc.DialOption |
| resolverScheme string |
| |
| // All test dialing is blocking by default. Set this to true if dial |
| // should be non-blocking. |
| nonBlockingDial bool |
| |
| // These are are set once startServer is called. The common case is to have |
| // only one testServer. |
| srv stopper |
| hSrv healthpb.HealthServer |
| srvAddr string |
| |
| // These are are set once startServers is called. |
| srvs []stopper |
| hSrvs []healthpb.HealthServer |
| srvAddrs []string |
| |
| cc *grpc.ClientConn // nil until requested via clientConn |
| restoreLogs func() // nil unless declareLogNoise is used |
| } |
| |
| type stopper interface { |
| Stop() |
| GracefulStop() |
| } |
| |
| func (te *test) tearDown() { |
| if te.cancel != nil { |
| te.cancel() |
| te.cancel = nil |
| } |
| |
| if te.cc != nil { |
| te.cc.Close() |
| te.cc = nil |
| } |
| |
| if te.restoreLogs != nil { |
| te.restoreLogs() |
| te.restoreLogs = nil |
| } |
| |
| if te.srv != nil { |
| te.srv.Stop() |
| } |
| for _, s := range te.srvs { |
| s.Stop() |
| } |
| } |
| |
| // newTest returns a new test using the provided testing.T and |
| // environment. It is returned with default values. Tests should |
| // modify it before calling its startServer and clientConn methods. |
| func newTest(t *testing.T, e env) *test { |
| te := &test{ |
| t: t, |
| e: e, |
| maxStream: math.MaxUint32, |
| } |
| te.ctx, te.cancel = context.WithCancel(context.Background()) |
| return te |
| } |
| |
| func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { |
| te.t.Helper() |
| te.t.Logf("Running test in %s environment...", te.e.name) |
| sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} |
| if te.maxServerMsgSize != nil { |
| sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize)) |
| } |
| if te.maxServerReceiveMsgSize != nil { |
| sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) |
| } |
| if te.maxServerSendMsgSize != nil { |
| sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize)) |
| } |
| if te.maxServerHeaderListSize != nil { |
| sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize)) |
| } |
| if te.tapHandle != nil { |
| sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) |
| } |
| if te.serverCompression { |
| sopts = append(sopts, |
| grpc.RPCCompressor(grpc.NewGZIPCompressor()), |
| grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), |
| ) |
| } |
| if te.unaryServerInt != nil { |
| sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt)) |
| } |
| if te.streamServerInt != nil { |
| sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt)) |
| } |
| if te.unknownHandler != nil { |
| sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) |
| } |
| if te.serverInitialWindowSize > 0 { |
| sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) |
| } |
| if te.serverInitialConnWindowSize > 0 { |
| sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) |
| } |
| la := "localhost:0" |
| switch te.e.network { |
| case "unix": |
| la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano()) |
| syscall.Unlink(la) |
| } |
| lis, err := listen(te.e.network, la) |
| if err != nil { |
| te.t.Fatalf("Failed to listen: %v", err) |
| } |
| if te.e.security == "tls" { |
| creds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) |
| if err != nil { |
| te.t.Fatalf("Failed to generate credentials %v", err) |
| } |
| sopts = append(sopts, grpc.Creds(creds)) |
| } |
| sopts = append(sopts, te.customServerOptions...) |
| s := grpc.NewServer(sopts...) |
| if ts != nil { |
| testpb.RegisterTestServiceServer(s, ts) |
| } |
| |
| // Create a new default health server if enableHealthServer is set, or use |
| // the provided one. |
| hs := te.healthServer |
| if te.enableHealthServer { |
| hs = health.NewServer() |
| } |
| if hs != nil { |
| healthgrpc.RegisterHealthServer(s, hs) |
| } |
| |
| addr := la |
| switch te.e.network { |
| case "unix": |
| default: |
| _, port, err := net.SplitHostPort(lis.Addr().String()) |
| if err != nil { |
| te.t.Fatalf("Failed to parse listener address: %v", err) |
| } |
| addr = "localhost:" + port |
| } |
| |
| te.srv = s |
| te.hSrv = hs |
| te.srvAddr = addr |
| |
| if te.e.httpHandler { |
| if te.e.security != "tls" { |
| te.t.Fatalf("unsupported environment settings") |
| } |
| cert, err := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) |
| if err != nil { |
| te.t.Fatal("tls.LoadX509KeyPair(server1.pem, server1.key) failed: ", err) |
| } |
| hs := &http.Server{ |
| Handler: s, |
| TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, |
| } |
| if err := http2.ConfigureServer(hs, &http2.Server{MaxConcurrentStreams: te.maxStream}); err != nil { |
| te.t.Fatal("http2.ConfigureServer(_, _) failed: ", err) |
| } |
| te.srv = wrapHS{hs} |
| tlsListener := tls.NewListener(lis, hs.TLSConfig) |
| go hs.Serve(tlsListener) |
| return lis |
| } |
| |
| go s.Serve(lis) |
| return lis |
| } |
| |
| type wrapHS struct { |
| s *http.Server |
| } |
| |
| func (w wrapHS) GracefulStop() { |
| w.s.Shutdown(context.Background()) |
| } |
| |
| func (w wrapHS) Stop() { |
| w.s.Close() |
| } |
| |
| func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper { |
| l := te.listenAndServe(ts, listenWithConnControl) |
| return l.(*listenerWrapper) |
| } |
| |
| // startServer starts a gRPC server exposing the provided TestService |
| // implementation. Callers should defer a call to te.tearDown to clean up |
| func (te *test) startServer(ts testpb.TestServiceServer) { |
| te.t.Helper() |
| te.listenAndServe(ts, net.Listen) |
| } |
| |
| // startServers starts 'num' gRPC servers exposing the provided TestService. |
| func (te *test) startServers(ts testpb.TestServiceServer, num int) { |
| for i := 0; i < num; i++ { |
| te.startServer(ts) |
| te.srvs = append(te.srvs, te.srv.(*grpc.Server)) |
| te.hSrvs = append(te.hSrvs, te.hSrv) |
| te.srvAddrs = append(te.srvAddrs, te.srvAddr) |
| te.srv = nil |
| te.hSrv = nil |
| te.srvAddr = "" |
| } |
| } |
| |
| // setHealthServingStatus is a helper function to set the health status. |
| func (te *test) setHealthServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) { |
| hs, ok := te.hSrv.(*health.Server) |
| if !ok { |
| panic(fmt.Sprintf("SetServingStatus(%v, %v) called for health server of type %T", service, status, hs)) |
| } |
| hs.SetServingStatus(service, status) |
| } |
| |
| type nopCompressor struct { |
| grpc.Compressor |
| } |
| |
| // NewNopCompressor creates a compressor to test the case that type is not supported. |
| func NewNopCompressor() grpc.Compressor { |
| return &nopCompressor{grpc.NewGZIPCompressor()} |
| } |
| |
| func (c *nopCompressor) Type() string { |
| return "nop" |
| } |
| |
| type nopDecompressor struct { |
| grpc.Decompressor |
| } |
| |
| // NewNopDecompressor creates a decompressor to test the case that type is not supported. |
| func NewNopDecompressor() grpc.Decompressor { |
| return &nopDecompressor{grpc.NewGZIPDecompressor()} |
| } |
| |
| func (d *nopDecompressor) Type() string { |
| return "nop" |
| } |
| |
| func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { |
| opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) |
| |
| if te.sc != nil { |
| opts = append(opts, grpc.WithServiceConfig(te.sc)) |
| } |
| |
| if te.clientCompression { |
| opts = append(opts, |
| grpc.WithCompressor(grpc.NewGZIPCompressor()), |
| grpc.WithDecompressor(grpc.NewGZIPDecompressor()), |
| ) |
| } |
| if te.clientUseCompression { |
| opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) |
| } |
| if te.clientNopCompression { |
| opts = append(opts, |
| grpc.WithCompressor(NewNopCompressor()), |
| grpc.WithDecompressor(NewNopDecompressor()), |
| ) |
| } |
| if te.unaryClientInt != nil { |
| opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt)) |
| } |
| if te.streamClientInt != nil { |
| opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) |
| } |
| if te.maxClientMsgSize != nil { |
| opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize)) |
| } |
| if te.maxClientReceiveMsgSize != nil { |
| opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) |
| } |
| if te.maxClientSendMsgSize != nil { |
| opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize))) |
| } |
| if te.maxClientHeaderListSize != nil { |
| opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize)) |
| } |
| switch te.e.security { |
| case "tls": |
| creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com") |
| if err != nil { |
| te.t.Fatalf("Failed to load credentials: %v", err) |
| } |
| opts = append(opts, grpc.WithTransportCredentials(creds)) |
| case "empty": |
| // Don't add any transport creds option. |
| default: |
| opts = append(opts, grpc.WithInsecure()) |
| } |
| // TODO(bar) switch balancer case "pick_first". |
| var scheme string |
| if te.resolverScheme == "" { |
| scheme = "passthrough:///" |
| } else { |
| scheme = te.resolverScheme + ":///" |
| } |
| if te.e.balancer != "" { |
| opts = append(opts, grpc.WithBalancerName(te.e.balancer)) |
| } |
| if te.clientInitialWindowSize > 0 { |
| opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) |
| } |
| if te.clientInitialConnWindowSize > 0 { |
| opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) |
| } |
| if te.perRPCCreds != nil { |
| opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) |
| } |
| if te.customCodec != nil { |
| opts = append(opts, grpc.WithDefaultCallOptions(grpc.ForceCodec(te.customCodec))) |
| } |
| if !te.nonBlockingDial && te.srvAddr != "" { |
| // Only do a blocking dial if server is up. |
| opts = append(opts, grpc.WithBlock()) |
| } |
| if te.srvAddr == "" { |
| te.srvAddr = "client.side.only.test" |
| } |
| opts = append(opts, te.customDialOptions...) |
| return opts, scheme |
| } |
| |
| func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) { |
| if te.cc != nil { |
| return te.cc, nil |
| } |
| opts, scheme := te.configDial() |
| dw := &dialerWrapper{} |
| // overwrite the dialer before |
| opts = append(opts, grpc.WithDialer(dw.dialer)) |
| var err error |
| te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) |
| if err != nil { |
| te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) |
| } |
| return te.cc, dw |
| } |
| |
| func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn { |
| if te.cc != nil { |
| return te.cc |
| } |
| var scheme string |
| opts, scheme = te.configDial(opts...) |
| var err error |
| te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) |
| if err != nil { |
| te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) |
| } |
| return te.cc |
| } |
| |
| func (te *test) declareLogNoise(phrases ...string) { |
| te.restoreLogs = declareLogNoise(te.t, phrases...) |
| } |
| |
| func (te *test) withServerTester(fn func(st *serverTester)) { |
| c, err := te.e.dialer(te.srvAddr, 10*time.Second) |
| if err != nil { |
| te.t.Fatal(err) |
| } |
| defer c.Close() |
| if te.e.security == "tls" { |
| c = tls.Client(c, &tls.Config{ |
| InsecureSkipVerify: true, |
| NextProtos: []string{http2.NextProtoTLS}, |
| }) |
| } |
| st := newServerTesterFromConn(te.t, c) |
| st.greet() |
| fn(st) |
| } |
| |
| type lazyConn struct { |
| net.Conn |
| beLazy int32 |
| } |
| |
| func (l *lazyConn) Write(b []byte) (int, error) { |
| if atomic.LoadInt32(&(l.beLazy)) == 1 { |
| time.Sleep(time.Second) |
| } |
| return l.Conn.Write(b) |
| } |
| |
| func (s) TestContextDeadlineNotIgnored(t *testing.T) { |
| e := noBalancerEnv |
| var lc *lazyConn |
| e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) { |
| conn, err := net.DialTimeout(network, addr, timeout) |
| if err != nil { |
| return nil, err |
| } |
| lc = &lazyConn{Conn: conn} |
| return lc, nil |
| } |
| |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| atomic.StoreInt32(&(lc.beLazy), 1) |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
| t1 := time.Now() |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) |
| } |
| if time.Since(t1) > 2*time.Second { |
| t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline") |
| } |
| } |
| |
| func (s) TestTimeoutOnDeadServer(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testTimeoutOnDeadServer(t, e) |
| } |
| } |
| |
| func testTimeoutOnDeadServer(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| te.srv.Stop() |
| |
| // Wait for the client to notice the connection is gone. |
| ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) |
| state := cc.GetState() |
| for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { |
| } |
| cancel() |
| if state == connectivity.Ready { |
| t.Fatalf("Timed out waiting for non-ready state") |
| } |
| ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) |
| _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) |
| cancel() |
| if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded { |
| // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, |
| // the error will be an internal error. |
| t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded) |
| } |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestServerGracefulStopIdempotent(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testServerGracefulStopIdempotent(t, e) |
| } |
| } |
| |
| func testServerGracefulStopIdempotent(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| for i := 0; i < 3; i++ { |
| te.srv.GracefulStop() |
| } |
| } |
| |
| func (s) TestServerGoAway(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testServerGoAway(t, e) |
| } |
| } |
| |
| func testServerGoAway(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| // Finish an RPC to make sure the connection is good. |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| defer cancel() |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| ch := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(ch) |
| }() |
| // Loop until the server side GoAway signal is propagated to the client. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded { |
| cancel() |
| break |
| } |
| cancel() |
| } |
| // A new RPC should fail. |
| ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) |
| defer cancel() |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) |
| } |
| <-ch |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestServerGoAwayPendingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testServerGoAwayPendingRPC(t, e) |
| } |
| } |
| |
| func testServerGoAwayPendingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| // Finish an RPC to make sure the connection is good. |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) |
| } |
| ch := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(ch) |
| }() |
| // Loop until the server side GoAway signal is propagated to the client. |
| start := time.Now() |
| errored := false |
| for time.Since(start) < time.Second { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
| _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) |
| cancel() |
| if err != nil { |
| errored = true |
| break |
| } |
| } |
| if !errored { |
| t.Fatalf("GoAway never received by client") |
| } |
| respParam := []*testpb.ResponseParameters{{Size: 1}} |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| // The existing RPC should be still good to proceed. |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) |
| } |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) |
| } |
| // The RPC will run until canceled. |
| cancel() |
| <-ch |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testServerMultipleGoAwayPendingRPC(t, e) |
| } |
| } |
| |
| func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| ctx, cancel := context.WithCancel(context.Background()) |
| stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| // Finish an RPC to make sure the connection is good. |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) |
| } |
| ch1 := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(ch1) |
| }() |
| ch2 := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(ch2) |
| }() |
| // Loop until the server side GoAway signal is propagated to the client. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| cancel() |
| break |
| } |
| cancel() |
| } |
| select { |
| case <-ch1: |
| t.Fatal("GracefulStop() terminated early") |
| case <-ch2: |
| t.Fatal("GracefulStop() terminated early") |
| default: |
| } |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: 1, |
| }, |
| } |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| // The existing RPC should be still good to proceed. |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) |
| } |
| <-ch1 |
| <-ch2 |
| cancel() |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testConcurrentClientConnCloseAndServerGoAway(t, e) |
| } |
| } |
| |
| func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) |
| } |
| ch := make(chan struct{}) |
| // Close ClientConn and Server concurrently. |
| go func() { |
| te.srv.GracefulStop() |
| close(ch) |
| }() |
| go func() { |
| cc.Close() |
| }() |
| <-ch |
| } |
| |
| func (s) TestConcurrentServerStopAndGoAway(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testConcurrentServerStopAndGoAway(t, e) |
| } |
| } |
| |
| func testConcurrentServerStopAndGoAway(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| // Finish an RPC to make sure the connection is good. |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) |
| } |
| ch := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(ch) |
| }() |
| // Loop until the server side GoAway signal is propagated to the client. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { |
| cancel() |
| break |
| } |
| cancel() |
| } |
| // Stop the server and close all the connections. |
| te.srv.Stop() |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: 1, |
| }, |
| } |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| sendStart := time.Now() |
| for { |
| if err := stream.Send(req); err == io.EOF { |
| // stream.Send should eventually send io.EOF |
| break |
| } else if err != nil { |
| // Send should never return a transport-level error. |
| t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err) |
| } |
| if time.Since(sendStart) > 2*time.Second { |
| t.Fatalf("stream.Send(_) did not return io.EOF after 2s") |
| } |
| time.Sleep(time.Millisecond) |
| } |
| if _, err := stream.Recv(); err == nil || err == io.EOF { |
| t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err) |
| } |
| <-ch |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testClientConnCloseAfterGoAwayWithActiveStream(t, e) |
| } |
| } |
| |
| func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| if _, err := tc.FullDuplexCall(ctx); err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) |
| } |
| done := make(chan struct{}) |
| go func() { |
| te.srv.GracefulStop() |
| close(done) |
| }() |
| time.Sleep(50 * time.Millisecond) |
| cc.Close() |
| timeout := time.NewTimer(time.Second) |
| select { |
| case <-done: |
| case <-timeout.C: |
| t.Fatalf("Test timed-out.") |
| } |
| } |
| |
| func (s) TestFailFast(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testFailFast(t, e) |
| } |
| } |
| |
| func testFailFast(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| defer cancel() |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| // Stop the server and tear down all the existing connections. |
| te.srv.Stop() |
| // Loop until the server teardown is propagated to the client. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| _, err := tc.EmptyCall(ctx, &testpb.Empty{}) |
| cancel() |
| if status.Code(err) == codes.Unavailable { |
| break |
| } |
| t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err) |
| time.Sleep(10 * time.Millisecond) |
| } |
| // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { |
| t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) |
| } |
| if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable { |
| t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) |
| } |
| |
| awaitNewConnLogOutput() |
| } |
| |
| func testServiceConfigSetup(t *testing.T, e env) *test { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| "Failed to dial : context canceled; please retry.", |
| ) |
| return te |
| } |
| |
| func newBool(b bool) (a *bool) { |
| return &b |
| } |
| |
| func newInt(b int) (a *int) { |
| return &b |
| } |
| |
| func newDuration(b time.Duration) (a *time.Duration) { |
| a = new(time.Duration) |
| *a = b |
| return |
| } |
| |
| func (s) TestGetMethodConfig(t *testing.T) { |
| te := testServiceConfigSetup(t, tcpClearRREnv) |
| defer te.tearDown() |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| te.resolverScheme = r.Scheme() |
| cc := te.clientConn(grpc.WithResolvers(r)) |
| addrs := []resolver.Address{{Addr: te.srvAddr}} |
| r.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "EmptyCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": ".001s" |
| }, |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService" |
| } |
| ], |
| "waitForReady": false |
| } |
| ] |
| }`)}) |
| |
| tc := testpb.NewTestServiceClient(cc) |
| |
| // Make sure service config has been processed by grpc. |
| for { |
| if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. |
| var err error |
| if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| |
| r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "UnaryCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": ".001s" |
| }, |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService" |
| } |
| ], |
| "waitForReady": false |
| } |
| ] |
| }`)}) |
| |
| // Make sure service config has been processed by grpc. |
| for { |
| if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| // The following RPCs are expected to become fail-fast. |
| if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) |
| } |
| } |
| |
| func (s) TestServiceConfigWaitForReady(t *testing.T) { |
| te := testServiceConfigSetup(t, tcpClearRREnv) |
| defer te.tearDown() |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. |
| te.resolverScheme = r.Scheme() |
| cc := te.clientConn(grpc.WithResolvers(r)) |
| addrs := []resolver.Address{{Addr: te.srvAddr}} |
| r.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "EmptyCall" |
| }, |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "waitForReady": false, |
| "timeout": ".001s" |
| } |
| ] |
| }`)}) |
| |
| tc := testpb.NewTestServiceClient(cc) |
| |
| // Make sure service config has been processed by grpc. |
| for { |
| if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. |
| var err error |
| if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) |
| } |
| |
| // Generate a service config update. |
| // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. |
| r.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "EmptyCall" |
| }, |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": ".001s" |
| } |
| ] |
| }`)}) |
| |
| // Wait for the new service config to take effect. |
| for { |
| if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) |
| } |
| } |
| |
| func (s) TestServiceConfigTimeout(t *testing.T) { |
| te := testServiceConfigSetup(t, tcpClearRREnv) |
| defer te.tearDown() |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. |
| te.resolverScheme = r.Scheme() |
| cc := te.clientConn(grpc.WithResolvers(r)) |
| addrs := []resolver.Address{{Addr: te.srvAddr}} |
| r.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "EmptyCall" |
| }, |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": "3600s" |
| } |
| ] |
| }`)}) |
| |
| tc := testpb.NewTestServiceClient(cc) |
| |
| // Make sure service config has been processed by grpc. |
| for { |
| if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. |
| var err error |
| ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) |
| if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| cancel() |
| |
| ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) |
| if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) |
| } |
| cancel() |
| |
| // Generate a service config update. |
| // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. |
| r.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "EmptyCall" |
| }, |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": ".000000001s" |
| } |
| ] |
| }`)}) |
| |
| // Wait for the new service config to take effect. |
| for { |
| if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| ctx, cancel = context.WithTimeout(context.Background(), time.Hour) |
| if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| cancel() |
| |
| ctx, cancel = context.WithTimeout(context.Background(), time.Hour) |
| if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) |
| } |
| cancel() |
| } |
| |
| func (s) TestServiceConfigMaxMsgSize(t *testing.T) { |
| e := tcpClearRREnv |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| // Setting up values and objects shared across all test cases. |
| const smallSize = 1 |
| const largeSize = 1024 |
| const extraLargeSize = 2048 |
| |
| smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). |
| te1 := testServiceConfigSetup(t, e) |
| defer te1.tearDown() |
| |
| te1.resolverScheme = r.Scheme() |
| te1.nonBlockingDial = true |
| te1.startServer(&testServer{security: e.security}) |
| cc1 := te1.clientConn(grpc.WithResolvers(r)) |
| |
| addrs := []resolver.Address{{Addr: te1.srvAddr}} |
| sc := parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "UnaryCall" |
| }, |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "maxRequestMessageBytes": 2048, |
| "maxResponseMessageBytes": 2048 |
| } |
| ] |
| }`) |
| r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc}) |
| tc := testpb.NewTestServiceClient(cc1) |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: int32(extraLargeSize), |
| Payload: smallPayload, |
| } |
| |
| for { |
| if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // Test for unary RPC recv. |
| if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for unary RPC send. |
| req.Payload = extraLargePayload |
| req.ResponseSize = int32(smallSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC recv. |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(extraLargeSize), |
| }, |
| } |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: smallPayload, |
| } |
| stream, err := tc.FullDuplexCall(te1.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err = stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC send. |
| respParam[0].Size = int32(smallSize) |
| sreq.Payload = extraLargePayload |
| stream, err = tc.FullDuplexCall(te1.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) |
| } |
| |
| // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). |
| te2 := testServiceConfigSetup(t, e) |
| te2.resolverScheme = r.Scheme() |
| te2.nonBlockingDial = true |
| te2.maxClientReceiveMsgSize = newInt(1024) |
| te2.maxClientSendMsgSize = newInt(1024) |
| |
| te2.startServer(&testServer{security: e.security}) |
| defer te2.tearDown() |
| cc2 := te2.clientConn(grpc.WithResolvers(r)) |
| r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc}) |
| tc = testpb.NewTestServiceClient(cc2) |
| |
| for { |
| if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // Test for unary RPC recv. |
| req.Payload = smallPayload |
| req.ResponseSize = int32(largeSize) |
| |
| if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for unary RPC send. |
| req.Payload = largePayload |
| req.ResponseSize = int32(smallSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC recv. |
| stream, err = tc.FullDuplexCall(te2.ctx) |
| respParam[0].Size = int32(largeSize) |
| sreq.Payload = smallPayload |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err = stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC send. |
| respParam[0].Size = int32(smallSize) |
| sreq.Payload = largePayload |
| stream, err = tc.FullDuplexCall(te2.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) |
| } |
| |
| // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). |
| te3 := testServiceConfigSetup(t, e) |
| te3.resolverScheme = r.Scheme() |
| te3.nonBlockingDial = true |
| te3.maxClientReceiveMsgSize = newInt(4096) |
| te3.maxClientSendMsgSize = newInt(4096) |
| |
| te3.startServer(&testServer{security: e.security}) |
| defer te3.tearDown() |
| |
| cc3 := te3.clientConn(grpc.WithResolvers(r)) |
| r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc}) |
| tc = testpb.NewTestServiceClient(cc3) |
| |
| for { |
| if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| // Test for unary RPC recv. |
| req.Payload = smallPayload |
| req.ResponseSize = int32(largeSize) |
| |
| if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) |
| } |
| |
| req.ResponseSize = int32(extraLargeSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for unary RPC send. |
| req.Payload = largePayload |
| req.ResponseSize = int32(smallSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err != nil { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) |
| } |
| |
| req.Payload = extraLargePayload |
| if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC recv. |
| stream, err = tc.FullDuplexCall(te3.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| respParam[0].Size = int32(largeSize) |
| sreq.Payload = smallPayload |
| |
| if err = stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err = stream.Recv(); err != nil { |
| t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) |
| } |
| |
| respParam[0].Size = int32(extraLargeSize) |
| |
| if err = stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC send. |
| respParam[0].Size = int32(smallSize) |
| sreq.Payload = largePayload |
| stream, err = tc.FullDuplexCall(te3.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| sreq.Payload = extraLargePayload |
| if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) |
| } |
| } |
| |
| // Reading from a streaming RPC may fail with context canceled if timeout was |
| // set by service config (https://github.com/grpc/grpc-go/issues/1818). This |
| // test makes sure read from streaming RPC doesn't fail in this case. |
| func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { |
| te := testServiceConfigSetup(t, tcpClearRREnv) |
| te.startServer(&testServer{security: tcpClearRREnv.security}) |
| defer te.tearDown() |
| r := manual.NewBuilderWithScheme("whatever") |
| |
| te.resolverScheme = r.Scheme() |
| te.nonBlockingDial = true |
| cc := te.clientConn(grpc.WithResolvers(r)) |
| tc := testpb.NewTestServiceClient(cc) |
| |
| r.UpdateState(resolver.State{ |
| Addresses: []resolver.Address{{Addr: te.srvAddr}}, |
| ServiceConfig: parseCfg(r, `{ |
| "methodConfig": [ |
| { |
| "name": [ |
| { |
| "service": "grpc.testing.TestService", |
| "method": "FullDuplexCall" |
| } |
| ], |
| "waitForReady": true, |
| "timeout": "10s" |
| } |
| ] |
| }`)}) |
| // Make sure service config has been processed by grpc. |
| for { |
| if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) |
| if err != nil { |
| t.Fatalf("failed to newPayload: %v", err) |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: []*testpb.ResponseParameters{{Size: 0}}, |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err) |
| } |
| stream.CloseSend() |
| time.Sleep(time.Second) |
| // Sleep 1 second before recv to make sure the final status is received |
| // before the recv. |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("stream.Recv = _, %v, want _, <nil>", err) |
| } |
| // Keep reading to drain the stream. |
| for { |
| if _, err := stream.Recv(); err != nil { |
| break |
| } |
| } |
| } |
| |
| func (s) TestPreloaderClientSend(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testPreloaderClientSend(t, e) |
| } |
| } |
| |
| func testPreloaderClientSend(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| "Failed to dial : context canceled; please retry.", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| // Test for streaming RPC recv. |
| // Set context for send with proper RPC Information |
| stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip")) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| var index int |
| for index < len(reqSizes) { |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(respSizes[index]), |
| }, |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| preparedMsg := &grpc.PreparedMsg{} |
| err = preparedMsg.Encode(stream, req) |
| if err != nil { |
| t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err) |
| } |
| if err := stream.SendMsg(preparedMsg); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| reply, err := stream.Recv() |
| if err != nil { |
| t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) |
| } |
| pt := reply.GetPayload().GetType() |
| if pt != testpb.PayloadType_COMPRESSABLE { |
| t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) |
| } |
| size := len(reply.GetPayload().GetBody()) |
| if size != int(respSizes[index]) { |
| t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) |
| } |
| index++ |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) |
| } |
| } |
| |
| func (s) TestMaxMsgSizeClientDefault(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMaxMsgSizeClientDefault(t, e) |
| } |
| } |
| |
| func testMaxMsgSizeClientDefault(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| "Failed to dial : context canceled; please retry.", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const smallSize = 1 |
| const largeSize = 4 * 1024 * 1024 |
| smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: int32(largeSize), |
| Payload: smallPayload, |
| } |
| // Test for unary RPC recv. |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(largeSize), |
| }, |
| } |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: smallPayload, |
| } |
| |
| // Test for streaming RPC recv. |
| stream, err := tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| } |
| |
| func (s) TestMaxMsgSizeClientAPI(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMaxMsgSizeClientAPI(t, e) |
| } |
| } |
| |
| func testMaxMsgSizeClientAPI(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| // To avoid error on server side. |
| te.maxServerSendMsgSize = newInt(5 * 1024 * 1024) |
| te.maxClientReceiveMsgSize = newInt(1024) |
| te.maxClientSendMsgSize = newInt(1024) |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| "Failed to dial : context canceled; please retry.", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const smallSize = 1 |
| const largeSize = 1024 |
| smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: int32(largeSize), |
| Payload: smallPayload, |
| } |
| // Test for unary RPC recv. |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for unary RPC send. |
| req.Payload = largePayload |
| req.ResponseSize = int32(smallSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(largeSize), |
| }, |
| } |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: smallPayload, |
| } |
| |
| // Test for streaming RPC recv. |
| stream, err := tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC send. |
| respParam[0].Size = int32(smallSize) |
| sreq.Payload = largePayload |
| stream, err = tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) |
| } |
| } |
| |
| func (s) TestMaxMsgSizeServerAPI(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMaxMsgSizeServerAPI(t, e) |
| } |
| } |
| |
| func testMaxMsgSizeServerAPI(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.maxServerReceiveMsgSize = newInt(1024) |
| te.maxServerSendMsgSize = newInt(1024) |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| "Failed to dial : context canceled; please retry.", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const smallSize = 1 |
| const largeSize = 1024 |
| smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: int32(largeSize), |
| Payload: smallPayload, |
| } |
| // Test for unary RPC send. |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Test for unary RPC recv. |
| req.Payload = largePayload |
| req.ResponseSize = int32(smallSize) |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(largeSize), |
| }, |
| } |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: smallPayload, |
| } |
| |
| // Test for streaming RPC send. |
| stream, err := tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test for streaming RPC recv. |
| respParam[0].Size = int32(smallSize) |
| sreq.Payload = largePayload |
| stream, err = tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| } |
| |
| func (s) TestTap(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testTap(t, e) |
| } |
| } |
| |
| type myTap struct { |
| cnt int |
| } |
| |
| func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) { |
| if info != nil { |
| if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" { |
| t.cnt++ |
| } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" { |
| return nil, fmt.Errorf("tap error") |
| } |
| } |
| return ctx, nil |
| } |
| |
| func testTap(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| ttap := &myTap{} |
| te.tapHandle = ttap.handle |
| te.declareLogNoise( |
| "transport: http2Client.notifyError got notified that the client transport was broken EOF", |
| "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", |
| "grpc: addrConn.resetTransport failed to create client transport: connection error", |
| ) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| if ttap.cnt != 1 { |
| t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt) |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: 45, |
| Payload: payload, |
| } |
| if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) |
| } |
| } |
| |
| // healthCheck is a helper function to make a unary health check RPC and return |
| // the response. |
| func healthCheck(d time.Duration, cc *grpc.ClientConn, service string) (*healthpb.HealthCheckResponse, error) { |
| ctx, cancel := context.WithTimeout(context.Background(), d) |
| defer cancel() |
| hc := healthgrpc.NewHealthClient(cc) |
| return hc.Check(ctx, &healthpb.HealthCheckRequest{Service: service}) |
| } |
| |
| // verifyHealthCheckStatus is a helper function to verify that the current |
| // health status of the service matches the one passed in 'wantStatus'. |
| func verifyHealthCheckStatus(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantStatus healthpb.HealthCheckResponse_ServingStatus) { |
| t.Helper() |
| resp, err := healthCheck(d, cc, service) |
| if err != nil { |
| t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) |
| } |
| if resp.Status != wantStatus { |
| t.Fatalf("Got the serving status %v, want %v", resp.Status, wantStatus) |
| } |
| } |
| |
| // verifyHealthCheckErrCode is a helper function to verify that a unary health |
| // check RPC returns an error with a code set to 'wantCode'. |
| func verifyHealthCheckErrCode(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantCode codes.Code) { |
| t.Helper() |
| if _, err := healthCheck(d, cc, service); status.Code(err) != wantCode { |
| t.Fatalf("Health/Check() got errCode %v, want %v", status.Code(err), wantCode) |
| } |
| } |
| |
| // newHealthCheckStream is a helper function to start a health check streaming |
| // RPC, and returns the stream. |
| func newHealthCheckStream(t *testing.T, cc *grpc.ClientConn, service string) (healthgrpc.Health_WatchClient, context.CancelFunc) { |
| t.Helper() |
| ctx, cancel := context.WithCancel(context.Background()) |
| hc := healthgrpc.NewHealthClient(cc) |
| stream, err := hc.Watch(ctx, &healthpb.HealthCheckRequest{Service: service}) |
| if err != nil { |
| t.Fatalf("hc.Watch(_, %v) failed: %v", service, err) |
| } |
| return stream, cancel |
| } |
| |
| // healthWatchChecker is a helper function to verify that the next health |
| // status returned on the given stream matches the one passed in 'wantStatus'. |
| func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, wantStatus healthpb.HealthCheckResponse_ServingStatus) { |
| t.Helper() |
| response, err := stream.Recv() |
| if err != nil { |
| t.Fatalf("stream.Recv() failed: %v", err) |
| } |
| if response.Status != wantStatus { |
| t.Fatalf("got servingStatus %v, want %v", response.Status, wantStatus) |
| } |
| } |
| |
| // TestHealthCheckSuccess invokes the unary Check() RPC on the health server in |
| // a successful case. |
| func (s) TestHealthCheckSuccess(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthCheckSuccess(t, e) |
| } |
| } |
| |
| func testHealthCheckSuccess(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| defer te.tearDown() |
| |
| verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.OK) |
| } |
| |
| // TestHealthCheckFailure invokes the unary Check() RPC on the health server |
| // with an expired context and expects the RPC to fail. |
| func (s) TestHealthCheckFailure(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthCheckFailure(t, e) |
| } |
| } |
| |
| func testHealthCheckFailure(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.declareLogNoise( |
| "Failed to dial ", |
| "grpc: the client connection is closing; please retry", |
| ) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| defer te.tearDown() |
| |
| verifyHealthCheckErrCode(t, 0*time.Second, te.clientConn(), defaultHealthService, codes.DeadlineExceeded) |
| awaitNewConnLogOutput() |
| } |
| |
| // TestHealthCheckOff makes a unary Check() RPC on the health server where the |
| // health status of the defaultHealthService is not set, and therefore expects |
| // an error code 'codes.NotFound'. |
| func (s) TestHealthCheckOff(t *testing.T) { |
| for _, e := range listTestEnv() { |
| // TODO(bradfitz): Temporarily skip this env due to #619. |
| if e.name == "handler-tls" { |
| continue |
| } |
| testHealthCheckOff(t, e) |
| } |
| } |
| |
| func testHealthCheckOff(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound) |
| } |
| |
| // TestHealthWatchMultipleClients makes a streaming Watch() RPC on the health |
| // server with multiple clients and expects the same status on both streams. |
| func (s) TestHealthWatchMultipleClients(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchMultipleClients(t, e) |
| } |
| } |
| |
| func testHealthWatchMultipleClients(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| stream1, cf1 := newHealthCheckStream(t, cc, defaultHealthService) |
| defer cf1() |
| healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) |
| |
| stream2, cf2 := newHealthCheckStream(t, cc, defaultHealthService) |
| defer cf2() |
| healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) |
| |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) |
| healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) |
| healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) |
| } |
| |
| // TestHealthWatchSameStatusmakes a streaming Watch() RPC on the health server |
| // and makes sure that the health status of the server is as expected after |
| // multiple calls to SetServingStatus with the same status. |
| func (s) TestHealthWatchSameStatus(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchSameStatus(t, e) |
| } |
| } |
| |
| func testHealthWatchSameStatus(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) |
| defer cf() |
| |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) |
| } |
| |
| // TestHealthWatchServiceStatusSetBeforeStartingServer starts a health server |
| // on which the health status for the defaultService is set before the gRPC |
| // server is started, and expects the correct health status to be returned. |
| func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchSetServiceStatusBeforeStartingServer(t, e) |
| } |
| } |
| |
| func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { |
| hs := health.NewServer() |
| te := newTest(t, e) |
| te.healthServer = hs |
| hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) |
| defer cf() |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) |
| } |
| |
| // TestHealthWatchDefaultStatusChange verifies the simple case where the |
| // service starts off with a SERVICE_UNKNOWN status (because SetServingStatus |
| // hasn't been called yet) and then moves to SERVING after SetServingStatus is |
| // called. |
| func (s) TestHealthWatchDefaultStatusChange(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchDefaultStatusChange(t, e) |
| } |
| } |
| |
| func testHealthWatchDefaultStatusChange(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) |
| defer cf() |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) |
| } |
| |
| // TestHealthWatchSetServiceStatusBeforeClientCallsWatch verifies the case |
| // where the health status is set to SERVING before the client calls Watch(). |
| func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e) |
| } |
| } |
| |
| func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| defer te.tearDown() |
| |
| stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) |
| defer cf() |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) |
| } |
| |
| // TestHealthWatchOverallServerHealthChange verifies setting the overall status |
| // of the server by using the empty service name. |
| func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthWatchOverallServerHealthChange(t, e) |
| } |
| } |
| |
| func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| stream, cf := newHealthCheckStream(t, te.clientConn(), "") |
| defer cf() |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) |
| te.setHealthServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) |
| healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) |
| } |
| |
| // TestUnknownHandler verifies that an expected error is returned (by setting |
| // the unknownHandler on the server) for a service which is not exposed to the |
| // client. |
| func (s) TestUnknownHandler(t *testing.T) { |
| // An example unknownHandler that returns a different code and a different |
| // method, making sure that we do not expose what methods are implemented to |
| // a client that is not authenticated. |
| unknownHandler := func(srv interface{}, stream grpc.ServerStream) error { |
| return status.Error(codes.Unauthenticated, "user unauthenticated") |
| } |
| for _, e := range listTestEnv() { |
| // TODO(bradfitz): Temporarily skip this env due to #619. |
| if e.name == "handler-tls" { |
| continue |
| } |
| testUnknownHandler(t, e, unknownHandler) |
| } |
| } |
| |
| func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) { |
| te := newTest(t, e) |
| te.unknownHandler = unknownHandler |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated) |
| } |
| |
| // TestHealthCheckServingStatus makes a streaming Watch() RPC on the health |
| // server and verifies a bunch of health status transitions. |
| func (s) TestHealthCheckServingStatus(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testHealthCheckServingStatus(t, e) |
| } |
| } |
| |
| func testHealthCheckServingStatus(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.enableHealthServer = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| verifyHealthCheckStatus(t, 1*time.Second, cc, "", healthpb.HealthCheckResponse_SERVING) |
| verifyHealthCheckErrCode(t, 1*time.Second, cc, defaultHealthService, codes.NotFound) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_SERVING) |
| te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) |
| verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) |
| } |
| |
| func (s) TestEmptyUnaryWithUserAgent(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testEmptyUnaryWithUserAgent(t, e) |
| } |
| } |
| |
| func testEmptyUnaryWithUserAgent(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| var header metadata.MD |
| reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) |
| if err != nil || !proto.Equal(&testpb.Empty{}, reply) { |
| t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{}) |
| } |
| if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) { |
| t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA) |
| } |
| |
| te.srv.Stop() |
| } |
| |
| func (s) TestFailedEmptyUnary(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| // This test covers status details, but |
| // Grpc-Status-Details-Bin is not support in handler_server. |
| continue |
| } |
| testFailedEmptyUnary(t, e) |
| } |
| } |
| |
| func testFailedEmptyUnary(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = failAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| wantErr := detailedError |
| if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr) |
| } |
| } |
| |
| func (s) TestLargeUnary(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testLargeUnary(t, e) |
| } |
| } |
| |
| func testLargeUnary(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const argSize = 271828 |
| const respSize = 314159 |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| reply, err := tc.UnaryCall(context.Background(), req) |
| if err != nil { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| pt := reply.GetPayload().GetType() |
| ps := len(reply.GetPayload().GetBody()) |
| if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize { |
| t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize) |
| } |
| } |
| |
| // Test backward-compatibility API for setting msg size limit. |
| func (s) TestExceedMsgLimit(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testExceedMsgLimit(t, e) |
| } |
| } |
| |
| func testExceedMsgLimit(t *testing.T, e env) { |
| te := newTest(t, e) |
| maxMsgSize := 1024 |
| te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| largeSize := int32(maxMsgSize + 1) |
| const smallSize = 1 |
| |
| largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| // Make sure the server cannot receive a unary RPC of largeSize. |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: smallSize, |
| Payload: largePayload, |
| } |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| // Make sure the client cannot receive a unary RPC of largeSize. |
| req.ResponseSize = largeSize |
| req.Payload = smallPayload |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } |
| |
| // Make sure the server cannot receive a streaming RPC of largeSize. |
| stream, err := tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: 1, |
| }, |
| } |
| |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: largePayload, |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| |
| // Test on client side for streaming RPC. |
| stream, err = tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| respParam[0].Size = largeSize |
| sreq.Payload = smallPayload |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) |
| } |
| } |
| |
| func (s) TestPeerClientSide(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testPeerClientSide(t, e) |
| } |
| } |
| |
| func testPeerClientSide(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = testAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| peer := new(peer.Peer) |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| pa := peer.Addr.String() |
| if e.network == "unix" { |
| if pa != te.srvAddr { |
| t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) |
| } |
| return |
| } |
| _, pp, err := net.SplitHostPort(pa) |
| if err != nil { |
| t.Fatalf("Failed to parse address from peer.") |
| } |
| _, sp, err := net.SplitHostPort(te.srvAddr) |
| if err != nil { |
| t.Fatalf("Failed to parse address of test server.") |
| } |
| if pp != sp { |
| t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) |
| } |
| } |
| |
| // TestPeerNegative tests that if call fails setting peer |
| // doesn't cause a segmentation fault. |
| // issue#1141 https://github.com/grpc/grpc-go/issues/1141 |
| func (s) TestPeerNegative(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testPeerNegative(t, e) |
| } |
| } |
| |
| func testPeerNegative(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| peer := new(peer.Peer) |
| ctx, cancel := context.WithCancel(context.Background()) |
| cancel() |
| tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)) |
| } |
| |
| func (s) TestPeerFailedRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testPeerFailedRPC(t, e) |
| } |
| } |
| |
| func testPeerFailedRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.maxServerReceiveMsgSize = newInt(1 * 1024) |
| te.startServer(&testServer{security: e.security}) |
| |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| // first make a successful request to the server |
| if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { |
| t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| |
| // make a second request that will be rejected by the server |
| const largeSize = 5 * 1024 |
| largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| Payload: largePayload, |
| } |
| |
| peer := new(peer.Peer) |
| if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) |
| } else { |
| pa := peer.Addr.String() |
| if e.network == "unix" { |
| if pa != te.srvAddr { |
| t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) |
| } |
| return |
| } |
| _, pp, err := net.SplitHostPort(pa) |
| if err != nil { |
| t.Fatalf("Failed to parse address from peer.") |
| } |
| _, sp, err := net.SplitHostPort(te.srvAddr) |
| if err != nil { |
| t.Fatalf("Failed to parse address of test server.") |
| } |
| if pp != sp { |
| t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) |
| } |
| } |
| } |
| |
| func (s) TestMetadataUnaryRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMetadataUnaryRPC(t, e) |
| } |
| } |
| |
| func testMetadataUnaryRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const argSize = 2718 |
| const respSize = 314 |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| var header, trailer metadata.MD |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil { |
| t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) |
| } |
| // Ignore optional response headers that Servers may set: |
| if header != nil { |
| delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers |
| delete(header, "date") // the Date header is also optional |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| } |
| if !reflect.DeepEqual(header, testMetadata) { |
| t.Fatalf("Received header metadata %v, want %v", header, testMetadata) |
| } |
| if !reflect.DeepEqual(trailer, testTrailerMetadata) { |
| t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata) |
| } |
| } |
| |
| func (s) TestMetadataOrderUnaryRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMetadataOrderUnaryRPC(t, e) |
| } |
| } |
| |
| func testMetadataOrderUnaryRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value2") |
| ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value3") |
| |
| // using Join to built expected metadata instead of FromOutgoingContext |
| newMetadata := metadata.Join(testMetadata, metadata.Pairs("key1", "value2", "key1", "value3")) |
| |
| var header metadata.MD |
| if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Header(&header)); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Ignore optional response headers that Servers may set: |
| if header != nil { |
| delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers |
| delete(header, "date") // the Date header is also optional |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| } |
| |
| if !reflect.DeepEqual(header, newMetadata) { |
| t.Fatalf("Received header metadata %v, want %v", header, newMetadata) |
| } |
| } |
| |
| func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMultipleSetTrailerUnaryRPC(t, e) |
| } |
| } |
| |
| func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = 1 |
| ) |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| var trailer metadata.MD |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) |
| } |
| expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) |
| if !reflect.DeepEqual(trailer, expectedTrailer) { |
| t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) |
| } |
| } |
| |
| func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMultipleSetTrailerStreamingRPC(t, e) |
| } |
| } |
| |
| func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) |
| } |
| |
| trailer := stream.Trailer() |
| expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) |
| if !reflect.DeepEqual(trailer, expectedTrailer) { |
| t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) |
| } |
| } |
| |
| func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testSetAndSendHeaderUnaryRPC(t, e) |
| } |
| } |
| |
| // To test header metadata is sent on SendHeader(). |
| func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setAndSendHeader: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = 1 |
| ) |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| var header metadata.MD |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| } |
| |
| func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testMultipleSetHeaderUnaryRPC(t, e) |
| } |
| } |
| |
| // To test header metadata is sent when sending response. |
| func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setHeaderOnly: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = 1 |
| ) |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| |
| var header metadata.MD |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { |
| t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| } |
| |
| func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testMultipleSetHeaderUnaryRPCError(t, e) |
| } |
| } |
| |
| // To test header metadata is sent when sending status. |
| func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setHeaderOnly: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = -1 // Invalid respSize to make RPC fail. |
| ) |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| var header metadata.MD |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil { |
| t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| } |
| |
| func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testSetAndSendHeaderStreamingRPC(t, e) |
| } |
| } |
| |
| // To test header metadata is sent on SendHeader(). |
| func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setAndSendHeader: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| stream, err := tc.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) |
| } |
| |
| header, err := stream.Header() |
| if err != nil { |
| t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| } |
| |
| func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testMultipleSetHeaderStreamingRPC(t, e) |
| } |
| } |
| |
| // To test header metadata is sent when sending response. |
| func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setHeaderOnly: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = 1 |
| ) |
| ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) |
| stream, err := tc.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: []*testpb.ResponseParameters{ |
| {Size: respSize}, |
| }, |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) |
| } |
| |
| header, err := stream.Header() |
| if err != nil { |
| t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| |
| } |
| |
| func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testMultipleSetHeaderStreamingRPCError(t, e) |
| } |
| } |
| |
| // To test header metadata is sent when sending status. |
| func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, setHeaderOnly: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const ( |
| argSize = 1 |
| respSize = -1 |
| ) |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| ctx = metadata.NewOutgoingContext(ctx, testMetadata) |
| stream, err := tc.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: []*testpb.ResponseParameters{ |
| {Size: respSize}, |
| }, |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| if _, err := stream.Recv(); err == nil { |
| t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) |
| } |
| |
| header, err := stream.Header() |
| if err != nil { |
| t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) |
| } |
| delete(header, "user-agent") |
| delete(header, "content-type") |
| expectedHeader := metadata.Join(testMetadata, testMetadata2) |
| if !reflect.DeepEqual(header, expectedHeader) { |
| t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| } |
| |
| // TestMalformedHTTP2Metadata verfies the returned error when the client |
| // sends an illegal metadata. |
| func (s) TestMalformedHTTP2Metadata(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| // Failed with "server stops accepting new RPCs". |
| // Server stops accepting new RPCs when the client sends an illegal http2 header. |
| continue |
| } |
| testMalformedHTTP2Metadata(t, e) |
| } |
| } |
| |
| func testMalformedHTTP2Metadata(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: 314, |
| Payload: payload, |
| } |
| ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata) |
| if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal { |
| t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal) |
| } |
| } |
| |
| func (s) TestTransparentRetry(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| // Fails with RST_STREAM / FLOW_CONTROL_ERROR |
| continue |
| } |
| testTransparentRetry(t, e) |
| } |
| } |
| |
| // This test makes sure RPCs are retried times when they receive a RST_STREAM |
| // with the REFUSED_STREAM error code, which the InTapHandle provokes. |
| func testTransparentRetry(t *testing.T, e env) { |
| te := newTest(t, e) |
| attempts := 0 |
| successAttempt := 2 |
| te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { |
| attempts++ |
| if attempts < successAttempt { |
| return nil, errors.New("not now") |
| } |
| return ctx, nil |
| } |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tsc := testpb.NewTestServiceClient(cc) |
| testCases := []struct { |
| successAttempt int |
| failFast bool |
| errCode codes.Code |
| }{{ |
| successAttempt: 1, |
| }, { |
| successAttempt: 2, |
| }, { |
| successAttempt: 3, |
| errCode: codes.Unavailable, |
| }, { |
| successAttempt: 1, |
| failFast: true, |
| }, { |
| successAttempt: 2, |
| failFast: true, |
| }, { |
| successAttempt: 3, |
| failFast: true, |
| errCode: codes.Unavailable, |
| }} |
| for _, tc := range testCases { |
| attempts = 0 |
| successAttempt = tc.successAttempt |
| |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast)) |
| cancel() |
| if status.Code(err) != tc.errCode { |
| t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) |
| } |
| } |
| } |
| |
| func (s) TestCancel(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testCancel(t, e) |
| } |
| } |
| |
| func testCancel(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.declareLogNoise("grpc: the client connection is closing; please retry") |
| te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| const argSize = 2718 |
| const respSize = 314 |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| ctx, cancel := context.WithCancel(context.Background()) |
| time.AfterFunc(1*time.Millisecond, cancel) |
| if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled { |
| t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled) |
| } |
| awaitNewConnLogOutput() |
| } |
| |
| func (s) TestCancelNoIO(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testCancelNoIO(t, e) |
| } |
| } |
| |
| func testCancelNoIO(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") |
| te.maxStream = 1 // Only allows 1 live stream per server transport. |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| // Start one blocked RPC for which we'll never send streaming |
| // input. This will consume the 1 maximum concurrent streams, |
| // causing future RPCs to hang. |
| ctx, cancelFirst := context.WithCancel(context.Background()) |
| _, err := tc.StreamingInputCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) |
| } |
| |
| // Loop until the ClientConn receives the initial settings |
| // frame from the server, notifying it about the maximum |
| // concurrent streams. We know when it's received it because |
| // an RPC will fail with codes.DeadlineExceeded instead of |
| // succeeding. |
| // TODO(bradfitz): add internal test hook for this (Issue 534) |
| for { |
| ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| _, err := tc.StreamingInputCall(ctx) |
| cancelSecond() |
| if err == nil { |
| continue |
| } |
| if status.Code(err) == codes.DeadlineExceeded { |
| break |
| } |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) |
| } |
| // If there are any RPCs in flight before the client receives |
| // the max streams setting, let them be expired. |
| // TODO(bradfitz): add internal test hook for this (Issue 534) |
| time.Sleep(50 * time.Millisecond) |
| |
| go func() { |
| time.Sleep(50 * time.Millisecond) |
| cancelFirst() |
| }() |
| |
| // This should be blocked until the 1st is canceled, then succeed. |
| ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond) |
| if _, err := tc.StreamingInputCall(ctx); err != nil { |
| t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) |
| } |
| cancelThird() |
| } |
| |
| // The following tests the gRPC streaming RPC implementations. |
| // TODO(zhaoq): Have better coverage on error cases. |
| var ( |
| reqSizes = []int{27182, 8, 1828, 45904} |
| respSizes = []int{31415, 9, 2653, 58979} |
| ) |
| |
| func (s) TestNoService(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testNoService(t, e) |
| } |
| } |
| |
| func testNoService(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(nil) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true)) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented { |
| t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented) |
| } |
| } |
| |
| func (s) TestPingPong(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testPingPong(t, e) |
| } |
| } |
| |
| func testPingPong(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| stream, err := tc.FullDuplexCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| var index int |
| for index < len(reqSizes) { |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(respSizes[index]), |
| }, |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| reply, err := stream.Recv() |
| if err != nil { |
| t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) |
| } |
| pt := reply.GetPayload().GetType() |
| if pt != testpb.PayloadType_COMPRESSABLE { |
| t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) |
| } |
| size := len(reply.GetPayload().GetBody()) |
| if size != int(respSizes[index]) { |
| t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) |
| } |
| index++ |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) |
| } |
| } |
| |
| func (s) TestMetadataStreamingRPC(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testMetadataStreamingRPC(t, e) |
| } |
| } |
| |
| func testMetadataStreamingRPC(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) |
| stream, err := tc.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| go func() { |
| headerMD, err := stream.Header() |
| if e.security == "tls" { |
| delete(headerMD, "transport_security_type") |
| } |
| delete(headerMD, "trailer") // ignore if present |
| delete(headerMD, "user-agent") |
| delete(headerMD, "content-type") |
| if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { |
| t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) |
| } |
| // test the cached value. |
| headerMD, err = stream.Header() |
| delete(headerMD, "trailer") // ignore if present |
| delete(headerMD, "user-agent") |
| delete(headerMD, "content-type") |
| if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { |
| t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) |
| } |
| err = func() error { |
| for index := 0; index < len(reqSizes); index++ { |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: int32(respSizes[index]), |
| }, |
| } |
| |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) |
| if err != nil { |
| return err |
| } |
| |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| } |
| return nil |
| }() |
| // Tell the server we're done sending args. |
| stream.CloseSend() |
| if err != nil { |
| t.Error(err) |
| } |
| }() |
| for { |
| if _, err := stream.Recv(); err != nil { |
| break |
| } |
| } |
| trailerMD := stream.Trailer() |
| if !reflect.DeepEqual(testTrailerMetadata, trailerMD) { |
| t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata) |
| } |
| } |
| |
| func (s) TestServerStreaming(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testServerStreaming(t, e) |
| } |
| } |
| |
| func testServerStreaming(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| respParam := make([]*testpb.ResponseParameters, len(respSizes)) |
| for i, s := range respSizes { |
| respParam[i] = &testpb.ResponseParameters{ |
| Size: int32(s), |
| } |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| } |
| stream, err := tc.StreamingOutputCall(context.Background(), req) |
| if err != nil { |
| t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) |
| } |
| var rpcStatus error |
| var respCnt int |
| var index int |
| for { |
| reply, err := stream.Recv() |
| if err != nil { |
| rpcStatus = err |
| break |
| } |
| pt := reply.GetPayload().GetType() |
| if pt != testpb.PayloadType_COMPRESSABLE { |
| t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) |
| } |
| size := len(reply.GetPayload().GetBody()) |
| if size != int(respSizes[index]) { |
| t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) |
| } |
| index++ |
| respCnt++ |
| } |
| if rpcStatus != io.EOF { |
| t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus) |
| } |
| if respCnt != len(respSizes) { |
| t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt) |
| } |
| } |
| |
| func (s) TestFailedServerStreaming(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testFailedServerStreaming(t, e) |
| } |
| } |
| |
| func testFailedServerStreaming(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.userAgent = failAppUA |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| respParam := make([]*testpb.ResponseParameters, len(respSizes)) |
| for i, s := range respSizes { |
| respParam[i] = &testpb.ResponseParameters{ |
| Size: int32(s), |
| } |
| } |
| req := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| } |
| ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) |
| stream, err := tc.StreamingOutputCall(ctx, req) |
| if err != nil { |
| t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) |
| } |
| wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA) |
| if _, err := stream.Recv(); !equalError(err, wantErr) { |
| t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr) |
| } |
| } |
| |
| func equalError(x, y error) bool { |
| return x == y || (x != nil && y != nil && x.Error() == y.Error()) |
| } |
| |
| // concurrentSendServer is a TestServiceServer whose |
| // StreamingOutputCall makes ten serial Send calls, sending payloads |
| // "0".."9", inclusive. TestServerStreamingConcurrent verifies they |
| // were received in the correct order, and that there were no races. |
| // |
| // All other TestServiceServer methods crash if called. |
| type concurrentSendServer struct { |
| testpb.TestServiceServer |
| } |
| |
| func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { |
| for i := 0; i < 10; i++ { |
| stream.Send(&testpb.StreamingOutputCallResponse{ |
| Payload: &testpb.Payload{ |
| Body: []byte{'0' + uint8(i)}, |
| }, |
| }) |
| } |
| return nil |
| } |
| |
| // Tests doing a bunch of concurrent streaming output calls. |
| func (s) TestServerStreamingConcurrent(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testServerStreamingConcurrent(t, e) |
| } |
| } |
| |
| func testServerStreamingConcurrent(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(concurrentSendServer{}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| doStreamingCall := func() { |
| req := &testpb.StreamingOutputCallRequest{} |
| stream, err := tc.StreamingOutputCall(context.Background(), req) |
| if err != nil { |
| t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) |
| return |
| } |
| var ngot int |
| var buf bytes.Buffer |
| for { |
| reply, err := stream.Recv() |
| if err == io.EOF { |
| break |
| } |
| if err != nil { |
| t.Fatal(err) |
| } |
| ngot++ |
| if buf.Len() > 0 { |
| buf.WriteByte(',') |
| } |
| buf.Write(reply.GetPayload().GetBody()) |
| } |
| if want := 10; ngot != want { |
| t.Errorf("Got %d replies, want %d", ngot, want) |
| } |
| if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { |
| t.Errorf("Got replies %q; want %q", got, want) |
| } |
| } |
| |
| var wg sync.WaitGroup |
| for i := 0; i < 20; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| doStreamingCall() |
| }() |
| } |
| wg.Wait() |
| |
| } |
| |
| func generatePayloadSizes() [][]int { |
| reqSizes := [][]int{ |
| {27182, 8, 1828, 45904}, |
| } |
| |
| num8KPayloads := 1024 |
| eightKPayloads := []int{} |
| for i := 0; i < num8KPayloads; i++ { |
| eightKPayloads = append(eightKPayloads, (1 << 13)) |
| } |
| reqSizes = append(reqSizes, eightKPayloads) |
| |
| num2MPayloads := 8 |
| twoMPayloads := []int{} |
| for i := 0; i < num2MPayloads; i++ { |
| twoMPayloads = append(twoMPayloads, (1 << 21)) |
| } |
| reqSizes = append(reqSizes, twoMPayloads) |
| |
| return reqSizes |
| } |
| |
| func (s) TestClientStreaming(t *testing.T) { |
| for _, s := range generatePayloadSizes() { |
| for _, e := range listTestEnv() { |
| testClientStreaming(t, e, s) |
| } |
| } |
| } |
| |
| func testClientStreaming(t *testing.T, e env, sizes []int) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| ctx, cancel := context.WithTimeout(te.ctx, time.Second*30) |
| defer cancel() |
| stream, err := tc.StreamingInputCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) |
| } |
| |
| var sum int |
| for _, s := range sizes { |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingInputCallRequest{ |
| Payload: payload, |
| } |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) |
| } |
| sum += s |
| } |
| reply, err := stream.CloseAndRecv() |
| if err != nil { |
| t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) |
| } |
| if reply.GetAggregatedPayloadSize() != int32(sum) { |
| t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum) |
| } |
| } |
| |
| func (s) TestClientStreamingError(t *testing.T) { |
| for _, e := range listTestEnv() { |
| if e.name == "handler-tls" { |
| continue |
| } |
| testClientStreamingError(t, e) |
| } |
| } |
| |
| func testClientStreamingError(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.startServer(&testServer{security: e.security, earlyFail: true}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| stream, err := tc.StreamingInputCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) |
| } |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &testpb.StreamingInputCallRequest{ |
| Payload: payload, |
| } |
| // The 1st request should go through. |
| if err := stream.Send(req); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) |
| } |
| for { |
| if err := stream.Send(req); err != io.EOF { |
| continue |
| } |
| if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound { |
| t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound) |
| } |
| break |
| } |
| } |
| |
| func (s) TestExceedMaxStreamsLimit(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testExceedMaxStreamsLimit(t, e) |
| } |
| } |
| |
| func testExceedMaxStreamsLimit(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.declareLogNoise( |
| "http2Client.notifyError got notified that the client transport was broken", |
| "Conn.resetTransport failed to create client transport", |
| "grpc: the connection is closing", |
| ) |
| te.maxStream = 1 // Only allows 1 live stream per server transport. |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| |
| _, err := tc.StreamingInputCall(te.ctx) |
| if err != nil { |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) |
| } |
| // Loop until receiving the new max stream setting from the server. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
| _, err := tc.StreamingInputCall(ctx) |
| if err == nil { |
| time.Sleep(50 * time.Millisecond) |
| continue |
| } |
| if status.Code(err) == codes.DeadlineExceeded { |
| break |
| } |
| t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) |
| } |
| } |
| |
| func (s) TestStreamsQuotaRecovery(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testStreamsQuotaRecovery(t, e) |
| } |
| } |
| |
| func testStreamsQuotaRecovery(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.declareLogNoise( |
| "http2Client.notifyError got notified that the client transport was broken", |
| "Conn.resetTransport failed to create client transport", |
| "grpc: the connection is closing", |
| ) |
| te.maxStream = 1 // Allows 1 live stream. |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| |
| cc := te.clientConn() |
| tc := testpb.NewTestServiceClient(cc) |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| if _, err := tc.StreamingInputCall(ctx); err != nil { |
| t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err) |
| } |
| // Loop until the new max stream setting is effective. |
| for { |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| _, err := tc.StreamingInputCall(ctx) |
| cancel() |
| if err == nil { |
| time.Sleep(5 * time.Millisecond) |
| continue |
| } |
| if status.Code(err) == codes.DeadlineExceeded { |
| break |
| } |
| t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| |
| var wg sync.WaitGroup |
| for i := 0; i < 10; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314) |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: 1592, |
| Payload: payload, |
| } |
| // No rpc should go through due to the max streams limit. |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
| defer cancel() |
| if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { |
| t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) |
| } |
| }() |
| } |
| wg.Wait() |
| |
| cancel() |
| // A new stream should be allowed after canceling the first one. |
| ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) |
| defer cancel() |
| if _, err := tc.StreamingInputCall(ctx); err != nil { |
| t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil) |
| } |
| } |
| |
| func (s) TestCompressServerHasNoSupport(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testCompressServerHasNoSupport(t, e) |
| } |
| } |
| |
| func testCompressServerHasNoSupport(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.serverCompression = false |
| te.clientCompression = false |
| te.clientNopCompression = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| const argSize = 271828 |
| const respSize = 314159 |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) |
| } |
| // Streaming RPC |
| stream, err := tc.FullDuplexCall(context.Background()) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented { |
| t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented) |
| } |
| } |
| |
| func (s) TestCompressOK(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testCompressOK(t, e) |
| } |
| } |
| |
| func testCompressOK(t *testing.T, e env) { |
| te := newTest(t, e) |
| te.serverCompression = true |
| te.clientCompression = true |
| te.startServer(&testServer{security: e.security}) |
| defer te.tearDown() |
| tc := testpb.NewTestServiceClient(te.clientConn()) |
| |
| // Unary call |
| const argSize = 271828 |
| const respSize = 314159 |
| payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := &testpb.SimpleRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseSize: respSize, |
| Payload: payload, |
| } |
| ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) |
| if _, err := tc.UnaryCall(ctx, req); err != nil { |
| t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) |
| } |
| // Streaming RPC |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| stream, err := tc.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) |
| } |
| respParam := []*testpb.ResponseParameters{ |
| { |
| Size: 31415, |
| }, |
| } |
| payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| sreq := &testpb.StreamingOutputCallRequest{ |
| ResponseType: testpb.PayloadType_COMPRESSABLE, |
| ResponseParameters: respParam, |
| Payload: payload, |
| } |
| if err := stream.Send(sreq); err != nil { |
| t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) |
| } |
| stream.CloseSend() |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) |
| } |
| } |
| |
| func (s) TestIdentityEncoding(t *testing.T) { |
| for _, e := range listTestEnv() { |
| testIdentityEncoding(t, e) |
|