Add stats test for client streaming and server streaming RPCs (#1140)
diff --git a/stats/grpc_testing/test.pb.go b/stats/grpc_testing/test.pb.go
index b24dcd8..5730004 100644
--- a/stats/grpc_testing/test.pb.go
+++ b/stats/grpc_testing/test.pb.go
@@ -34,7 +34,6 @@
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
-// Unary request.
type SimpleRequest struct {
Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"`
}
@@ -44,7 +43,13 @@
func (*SimpleRequest) ProtoMessage() {}
func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
-// Unary response, as configured by the request.
+func (m *SimpleRequest) GetId() int32 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
type SimpleResponse struct {
Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
}
@@ -54,6 +59,13 @@
func (*SimpleResponse) ProtoMessage() {}
func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+func (m *SimpleResponse) GetId() int32 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
func init() {
proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest")
proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse")
@@ -77,6 +89,10 @@
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error)
+ // Client stream
+ ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error)
+ // Server stream
+ ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error)
}
type testServiceClient struct {
@@ -127,6 +143,72 @@
return m, nil
}
+func (c *testServiceClient) ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[1], c.cc, "/grpc.testing.TestService/ClientStreamCall", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &testServiceClientStreamCallClient{stream}
+ return x, nil
+}
+
+type TestService_ClientStreamCallClient interface {
+ Send(*SimpleRequest) error
+ CloseAndRecv() (*SimpleResponse, error)
+ grpc.ClientStream
+}
+
+type testServiceClientStreamCallClient struct {
+ grpc.ClientStream
+}
+
+func (x *testServiceClientStreamCallClient) Send(m *SimpleRequest) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *testServiceClientStreamCallClient) CloseAndRecv() (*SimpleResponse, error) {
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ m := new(SimpleResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *testServiceClient) ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[2], c.cc, "/grpc.testing.TestService/ServerStreamCall", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &testServiceServerStreamCallClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type TestService_ServerStreamCallClient interface {
+ Recv() (*SimpleResponse, error)
+ grpc.ClientStream
+}
+
+type testServiceServerStreamCallClient struct {
+ grpc.ClientStream
+}
+
+func (x *testServiceServerStreamCallClient) Recv() (*SimpleResponse, error) {
+ m := new(SimpleResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
// Server API for TestService service
type TestServiceServer interface {
@@ -137,6 +219,10 @@
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
FullDuplexCall(TestService_FullDuplexCallServer) error
+ // Client stream
+ ClientStreamCall(TestService_ClientStreamCallServer) error
+ // Server stream
+ ServerStreamCall(*SimpleRequest, TestService_ServerStreamCallServer) error
}
func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
@@ -187,6 +273,53 @@
return m, nil
}
+func _TestService_ClientStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
+ return srv.(TestServiceServer).ClientStreamCall(&testServiceClientStreamCallServer{stream})
+}
+
+type TestService_ClientStreamCallServer interface {
+ SendAndClose(*SimpleResponse) error
+ Recv() (*SimpleRequest, error)
+ grpc.ServerStream
+}
+
+type testServiceClientStreamCallServer struct {
+ grpc.ServerStream
+}
+
+func (x *testServiceClientStreamCallServer) SendAndClose(m *SimpleResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *testServiceClientStreamCallServer) Recv() (*SimpleRequest, error) {
+ m := new(SimpleRequest)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func _TestService_ServerStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(SimpleRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(TestServiceServer).ServerStreamCall(m, &testServiceServerStreamCallServer{stream})
+}
+
+type TestService_ServerStreamCallServer interface {
+ Send(*SimpleResponse) error
+ grpc.ServerStream
+}
+
+type testServiceServerStreamCallServer struct {
+ grpc.ServerStream
+}
+
+func (x *testServiceServerStreamCallServer) Send(m *SimpleResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
var _TestService_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpc.testing.TestService",
HandlerType: (*TestServiceServer)(nil),
@@ -203,6 +336,16 @@
ServerStreams: true,
ClientStreams: true,
},
+ {
+ StreamName: "ClientStreamCall",
+ Handler: _TestService_ClientStreamCall_Handler,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "ServerStreamCall",
+ Handler: _TestService_ServerStreamCall_Handler,
+ ServerStreams: true,
+ },
},
Metadata: "test.proto",
}
@@ -210,16 +353,18 @@
func init() { proto.RegisterFile("test.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
- // 167 bytes of a gzipped FileDescriptorProto
+ // 196 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64,
0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96,
0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06,
0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42,
- 0x55, 0x30, 0xc3, 0x54, 0x18, 0x2d, 0x63, 0xe4, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
+ 0x55, 0x30, 0xc3, 0x54, 0x18, 0x9d, 0x60, 0xe2, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc,
0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b,
0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c,
- 0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x00, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d,
- 0x82, 0x5b, 0xdd, 0x0e, 0x01, 0x00, 0x00,
+ 0x83, 0xd1, 0x80, 0x51, 0xc8, 0x9f, 0x4b, 0xc0, 0x39, 0x27, 0x33, 0x35, 0xaf, 0x24, 0xb8, 0xa4,
+ 0x28, 0x35, 0x31, 0x97, 0x62, 0x23, 0x41, 0x06, 0x82, 0x3c, 0x9d, 0x5a, 0x44, 0x15, 0x03, 0x0d,
+ 0x18, 0x93, 0xd8, 0xc0, 0x51, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x61, 0x49, 0x59, 0xe6,
+ 0xb0, 0x01, 0x00, 0x00,
}
diff --git a/stats/grpc_testing/test.proto b/stats/grpc_testing/test.proto
index 54e6f74..bea8c4c 100644
--- a/stats/grpc_testing/test.proto
+++ b/stats/grpc_testing/test.proto
@@ -20,4 +20,10 @@
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse);
+
+ // Client stream
+ rpc ClientStreamCall(stream SimpleRequest) returns (SimpleResponse);
+
+ // Server stream
+ rpc ServerStreamCall(SimpleRequest) returns (stream SimpleResponse);
}
diff --git a/stats/stats_test.go b/stats/stats_test.go
index 35d60a4..467d6a5 100644
--- a/stats/stats_test.go
+++ b/stats/stats_test.go
@@ -120,6 +120,51 @@
}
}
+func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
+ md, ok := metadata.FromContext(stream.Context())
+ if ok {
+ if err := stream.SendHeader(md); err != nil {
+ return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
+ }
+ stream.SetTrailer(testTrailerMetadata)
+ }
+ for {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ // read done.
+ return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
+ }
+ if err != nil {
+ return err
+ }
+
+ if in.Id == errorID {
+ return fmt.Errorf("got error id: %v", in.Id)
+ }
+ }
+}
+
+func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
+ md, ok := metadata.FromContext(stream.Context())
+ if ok {
+ if err := stream.SendHeader(md); err != nil {
+ return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
+ }
+ stream.SetTrailer(testTrailerMetadata)
+ }
+
+ if in.Id == errorID {
+ return fmt.Errorf("got error id: %v", in.Id)
+ }
+
+ for i := 0; i < 5; i++ {
+ if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
// test is an end-to-end test. It should be created with the newTest
// func, modified as needed, and then started with its startServer method.
// It should be cleaned up with the tearDown method.
@@ -218,12 +263,21 @@
return te.cc
}
+type rpcType int
+
+const (
+ unaryRPC rpcType = iota
+ clientStreamRPC
+ serverStreamRPC
+ fullDuplexStreamRPC
+)
+
type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
- streaming bool // Whether the rpc should be a streaming RPC.
- noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
+ callType rpcType // Type of RPC.
+ noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
}
func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
@@ -289,6 +343,64 @@
return reqs, resps, nil
}
+func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
+ var (
+ reqs []*testpb.SimpleRequest
+ resp *testpb.SimpleResponse
+ err error
+ )
+ tc := testpb.NewTestServiceClient(te.clientConn())
+ stream, err := tc.ClientStreamCall(metadata.NewContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
+ if err != nil {
+ return reqs, resp, err
+ }
+ var startID int32
+ if !c.success {
+ startID = errorID
+ }
+ for i := 0; i < c.count; i++ {
+ req := &testpb.SimpleRequest{
+ Id: int32(i) + startID,
+ }
+ reqs = append(reqs, req)
+ if err = stream.Send(req); err != nil {
+ return reqs, resp, err
+ }
+ }
+ resp, err = stream.CloseAndRecv()
+ return reqs, resp, err
+}
+
+func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
+ var (
+ req *testpb.SimpleRequest
+ resps []*testpb.SimpleResponse
+ err error
+ )
+
+ tc := testpb.NewTestServiceClient(te.clientConn())
+
+ var startID int32
+ if !c.success {
+ startID = errorID
+ }
+ req = &testpb.SimpleRequest{Id: startID}
+ stream, err := tc.ServerStreamCall(metadata.NewContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
+ if err != nil {
+ return req, resps, err
+ }
+ for {
+ var resp *testpb.SimpleResponse
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ return req, resps, nil
+ } else if err != nil {
+ return req, resps, err
+ }
+ resps = append(resps, resp)
+ }
+}
+
type expectedData struct {
method string
serverAddr string
@@ -672,16 +784,35 @@
defer te.tearDown()
var (
- reqs []*testpb.SimpleRequest
- resps []*testpb.SimpleResponse
- err error
+ reqs []*testpb.SimpleRequest
+ resps []*testpb.SimpleResponse
+ err error
+ method string
+
+ req *testpb.SimpleRequest
+ resp *testpb.SimpleResponse
+ e error
)
- if !cc.streaming {
- req, resp, e := te.doUnaryCall(cc)
+
+ switch cc.callType {
+ case unaryRPC:
+ method = "/grpc.testing.TestService/UnaryCall"
+ req, resp, e = te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp}
err = e
- } else {
+ case clientStreamRPC:
+ method = "/grpc.testing.TestService/ClientStreamCall"
+ reqs, resp, e = te.doClientStreamCall(cc)
+ resps = []*testpb.SimpleResponse{resp}
+ err = e
+ case serverStreamRPC:
+ method = "/grpc.testing.TestService/ServerStreamCall"
+ req, resps, e = te.doServerStreamCall(cc)
+ reqs = []*testpb.SimpleRequest{req}
+ err = e
+ case fullDuplexStreamRPC:
+ method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
}
if cc.success != (err == nil) {
@@ -713,22 +844,18 @@
expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
+ method: method,
requests: reqs,
responses: resps,
err: err,
}
- if !cc.streaming {
- expect.method = "/grpc.testing.TestService/UnaryCall"
- } else {
- expect.method = "/grpc.testing.TestService/FullDuplexCall"
- }
checkConnStats(t, h.gotConn)
checkServerStats(t, h.gotRPC, expect, checkFuncs)
}
func TestServerStatsUnaryRPC(t *testing.T) {
- testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){
+ testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
@@ -740,7 +867,7 @@
}
func TestServerStatsUnaryRPCError(t *testing.T) {
- testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){
+ testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
@@ -750,7 +877,73 @@
})
}
-func TestServerStatsStreamingRPC(t *testing.T) {
+func TestServerStatsClientStreamRPC(t *testing.T) {
+ count := 5
+ checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+ checkInHeader,
+ checkBegin,
+ checkOutHeader,
+ }
+ ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+ checkInPayload,
+ }
+ for i := 0; i < count; i++ {
+ checkFuncs = append(checkFuncs, ioPayFuncs...)
+ }
+ checkFuncs = append(checkFuncs,
+ checkOutPayload,
+ checkOutTrailer,
+ checkEnd,
+ )
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
+}
+
+func TestServerStatsClientStreamRPCError(t *testing.T) {
+ count := 1
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
+ checkInHeader,
+ checkBegin,
+ checkOutHeader,
+ checkInPayload,
+ checkOutTrailer,
+ checkEnd,
+ })
+}
+
+func TestServerStatsServerStreamRPC(t *testing.T) {
+ count := 5
+ checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+ checkInHeader,
+ checkBegin,
+ checkInPayload,
+ checkOutHeader,
+ }
+ ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+ checkOutPayload,
+ }
+ for i := 0; i < count; i++ {
+ checkFuncs = append(checkFuncs, ioPayFuncs...)
+ }
+ checkFuncs = append(checkFuncs,
+ checkOutTrailer,
+ checkEnd,
+ )
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
+}
+
+func TestServerStatsServerStreamRPCError(t *testing.T) {
+ count := 5
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
+ checkInHeader,
+ checkBegin,
+ checkInPayload,
+ checkOutHeader,
+ checkOutTrailer,
+ checkEnd,
+ })
+}
+
+func TestServerStatsFullDuplexRPC(t *testing.T) {
count := 5
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
@@ -768,12 +961,12 @@
checkOutTrailer,
checkEnd,
)
- testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs)
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
}
-func TestServerStatsStreamingRPCError(t *testing.T) {
+func TestServerStatsFullDuplexRPCError(t *testing.T) {
count := 5
- testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){
+ testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkOutHeader,
@@ -880,16 +1073,34 @@
defer te.tearDown()
var (
- reqs []*testpb.SimpleRequest
- resps []*testpb.SimpleResponse
- err error
+ reqs []*testpb.SimpleRequest
+ resps []*testpb.SimpleResponse
+ method string
+ err error
+
+ req *testpb.SimpleRequest
+ resp *testpb.SimpleResponse
+ e error
)
- if !cc.streaming {
- req, resp, e := te.doUnaryCall(cc)
+ switch cc.callType {
+ case unaryRPC:
+ method = "/grpc.testing.TestService/UnaryCall"
+ req, resp, e = te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp}
err = e
- } else {
+ case clientStreamRPC:
+ method = "/grpc.testing.TestService/ClientStreamCall"
+ reqs, resp, e = te.doClientStreamCall(cc)
+ resps = []*testpb.SimpleResponse{resp}
+ err = e
+ case serverStreamRPC:
+ method = "/grpc.testing.TestService/ServerStreamCall"
+ req, resps, e = te.doServerStreamCall(cc)
+ reqs = []*testpb.SimpleRequest{req}
+ err = e
+ case fullDuplexStreamRPC:
+ method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
}
if cc.success != (err == nil) {
@@ -925,23 +1136,19 @@
expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
+ method: method,
requests: reqs,
responses: resps,
failfast: cc.failfast,
err: err,
}
- if !cc.streaming {
- expect.method = "/grpc.testing.TestService/UnaryCall"
- } else {
- expect.method = "/grpc.testing.TestService/FullDuplexCall"
- }
checkConnStats(t, h.gotConn)
checkClientStats(t, h.gotRPC, expect, checkFuncs)
}
func TestClientStatsUnaryRPC(t *testing.T) {
- testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{
+ testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1},
@@ -953,7 +1160,7 @@
}
func TestClientStatsUnaryRPCError(t *testing.T) {
- testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{
+ testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1},
@@ -963,23 +1170,59 @@
})
}
-func TestClientStatsStreamingRPC(t *testing.T) {
+func TestClientStatsClientStreamRPC(t *testing.T) {
count := 5
- testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
- outPayload: {checkOutPayload, count},
inHeader: {checkInHeader, 1},
- inPayload: {checkInPayload, count},
+ outPayload: {checkOutPayload, count},
inTrailer: {checkInTrailer, 1},
+ inPayload: {checkInPayload, 1},
end: {checkEnd, 1},
})
}
-// If the user doesn't call the last recv() on clientSteam.
-func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) {
+func TestClientStatsClientStreamRPCError(t *testing.T) {
count := 1
- testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true, noLastRecv: true}, map[int]*checkFuncWithCount{
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
+ begin: {checkBegin, 1},
+ outHeader: {checkOutHeader, 1},
+ inHeader: {checkInHeader, 1},
+ outPayload: {checkOutPayload, 1},
+ inTrailer: {checkInTrailer, 1},
+ end: {checkEnd, 1},
+ })
+}
+
+func TestClientStatsServerStreamRPC(t *testing.T) {
+ count := 5
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
+ begin: {checkBegin, 1},
+ outHeader: {checkOutHeader, 1},
+ outPayload: {checkOutPayload, 1},
+ inHeader: {checkInHeader, 1},
+ inPayload: {checkInPayload, count},
+ inTrailer: {checkInTrailer, 1},
+ end: {checkEnd, 1},
+ })
+}
+
+func TestClientStatsServerStreamRPCError(t *testing.T) {
+ count := 5
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
+ begin: {checkBegin, 1},
+ outHeader: {checkOutHeader, 1},
+ outPayload: {checkOutPayload, 1},
+ inHeader: {checkInHeader, 1},
+ inTrailer: {checkInTrailer, 1},
+ end: {checkEnd, 1},
+ })
+}
+
+func TestClientStatsFullDuplexRPC(t *testing.T) {
+ count := 5
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count},
@@ -990,9 +1233,9 @@
})
}
-func TestClientStatsStreamingRPCError(t *testing.T) {
+func TestClientStatsFullDuplexRPCError(t *testing.T) {
count := 5
- testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1},
@@ -1001,3 +1244,17 @@
end: {checkEnd, 1},
})
}
+
+// If the user doesn't call the last recv() on clientStream.
+func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
+ count := 1
+ testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
+ begin: {checkBegin, 1},
+ outHeader: {checkOutHeader, 1},
+ outPayload: {checkOutPayload, count},
+ inHeader: {checkInHeader, 1},
+ inPayload: {checkInPayload, count},
+ inTrailer: {checkInTrailer, 1},
+ end: {checkEnd, 1},
+ })
+}