Add stats test for client streaming and server streaming RPCs (#1140)

diff --git a/stats/grpc_testing/test.pb.go b/stats/grpc_testing/test.pb.go
index b24dcd8..5730004 100644
--- a/stats/grpc_testing/test.pb.go
+++ b/stats/grpc_testing/test.pb.go
@@ -34,7 +34,6 @@
 // proto package needs to be updated.
 const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
 
-// Unary request.
 type SimpleRequest struct {
 	Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"`
 }
@@ -44,7 +43,13 @@
 func (*SimpleRequest) ProtoMessage()               {}
 func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
 
-// Unary response, as configured by the request.
+func (m *SimpleRequest) GetId() int32 {
+	if m != nil {
+		return m.Id
+	}
+	return 0
+}
+
 type SimpleResponse struct {
 	Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
 }
@@ -54,6 +59,13 @@
 func (*SimpleResponse) ProtoMessage()               {}
 func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
 
+func (m *SimpleResponse) GetId() int32 {
+	if m != nil {
+		return m.Id
+	}
+	return 0
+}
+
 func init() {
 	proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest")
 	proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse")
@@ -77,6 +89,10 @@
 	// As one request could lead to multiple responses, this interface
 	// demonstrates the idea of full duplexing.
 	FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error)
+	// Client stream
+	ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error)
+	// Server stream
+	ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error)
 }
 
 type testServiceClient struct {
@@ -127,6 +143,72 @@
 	return m, nil
 }
 
+func (c *testServiceClient) ClientStreamCall(ctx context.Context, opts ...grpc.CallOption) (TestService_ClientStreamCallClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[1], c.cc, "/grpc.testing.TestService/ClientStreamCall", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &testServiceClientStreamCallClient{stream}
+	return x, nil
+}
+
+type TestService_ClientStreamCallClient interface {
+	Send(*SimpleRequest) error
+	CloseAndRecv() (*SimpleResponse, error)
+	grpc.ClientStream
+}
+
+type testServiceClientStreamCallClient struct {
+	grpc.ClientStream
+}
+
+func (x *testServiceClientStreamCallClient) Send(m *SimpleRequest) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *testServiceClientStreamCallClient) CloseAndRecv() (*SimpleResponse, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(SimpleResponse)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func (c *testServiceClient) ServerStreamCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (TestService_ServerStreamCallClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[2], c.cc, "/grpc.testing.TestService/ServerStreamCall", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &testServiceServerStreamCallClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type TestService_ServerStreamCallClient interface {
+	Recv() (*SimpleResponse, error)
+	grpc.ClientStream
+}
+
+type testServiceServerStreamCallClient struct {
+	grpc.ClientStream
+}
+
+func (x *testServiceServerStreamCallClient) Recv() (*SimpleResponse, error) {
+	m := new(SimpleResponse)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
 // Server API for TestService service
 
 type TestServiceServer interface {
@@ -137,6 +219,10 @@
 	// As one request could lead to multiple responses, this interface
 	// demonstrates the idea of full duplexing.
 	FullDuplexCall(TestService_FullDuplexCallServer) error
+	// Client stream
+	ClientStreamCall(TestService_ClientStreamCallServer) error
+	// Server stream
+	ServerStreamCall(*SimpleRequest, TestService_ServerStreamCallServer) error
 }
 
 func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
@@ -187,6 +273,53 @@
 	return m, nil
 }
 
+func _TestService_ClientStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(TestServiceServer).ClientStreamCall(&testServiceClientStreamCallServer{stream})
+}
+
+type TestService_ClientStreamCallServer interface {
+	SendAndClose(*SimpleResponse) error
+	Recv() (*SimpleRequest, error)
+	grpc.ServerStream
+}
+
+type testServiceClientStreamCallServer struct {
+	grpc.ServerStream
+}
+
+func (x *testServiceClientStreamCallServer) SendAndClose(m *SimpleResponse) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *testServiceClientStreamCallServer) Recv() (*SimpleRequest, error) {
+	m := new(SimpleRequest)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func _TestService_ServerStreamCall_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(SimpleRequest)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(TestServiceServer).ServerStreamCall(m, &testServiceServerStreamCallServer{stream})
+}
+
+type TestService_ServerStreamCallServer interface {
+	Send(*SimpleResponse) error
+	grpc.ServerStream
+}
+
+type testServiceServerStreamCallServer struct {
+	grpc.ServerStream
+}
+
+func (x *testServiceServerStreamCallServer) Send(m *SimpleResponse) error {
+	return x.ServerStream.SendMsg(m)
+}
+
 var _TestService_serviceDesc = grpc.ServiceDesc{
 	ServiceName: "grpc.testing.TestService",
 	HandlerType: (*TestServiceServer)(nil),
@@ -203,6 +336,16 @@
 			ServerStreams: true,
 			ClientStreams: true,
 		},
+		{
+			StreamName:    "ClientStreamCall",
+			Handler:       _TestService_ClientStreamCall_Handler,
+			ClientStreams: true,
+		},
+		{
+			StreamName:    "ServerStreamCall",
+			Handler:       _TestService_ServerStreamCall_Handler,
+			ServerStreams: true,
+		},
 	},
 	Metadata: "test.proto",
 }
@@ -210,16 +353,18 @@
 func init() { proto.RegisterFile("test.proto", fileDescriptor0) }
 
 var fileDescriptor0 = []byte{
-	// 167 bytes of a gzipped FileDescriptorProto
+	// 196 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
 	0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64,
 	0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96,
 	0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06,
 	0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42,
-	0x55, 0x30, 0xc3, 0x54, 0x18, 0x2d, 0x63, 0xe4, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
+	0x55, 0x30, 0xc3, 0x54, 0x18, 0x9d, 0x60, 0xe2, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
 	0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc,
 	0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b,
 	0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c,
-	0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x00, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d,
-	0x82, 0x5b, 0xdd, 0x0e, 0x01, 0x00, 0x00,
+	0x83, 0xd1, 0x80, 0x51, 0xc8, 0x9f, 0x4b, 0xc0, 0x39, 0x27, 0x33, 0x35, 0xaf, 0x24, 0xb8, 0xa4,
+	0x28, 0x35, 0x31, 0x97, 0x62, 0x23, 0x41, 0x06, 0x82, 0x3c, 0x9d, 0x5a, 0x44, 0x15, 0x03, 0x0d,
+	0x18, 0x93, 0xd8, 0xc0, 0x51, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x61, 0x49, 0x59, 0xe6,
+	0xb0, 0x01, 0x00, 0x00,
 }
diff --git a/stats/grpc_testing/test.proto b/stats/grpc_testing/test.proto
index 54e6f74..bea8c4c 100644
--- a/stats/grpc_testing/test.proto
+++ b/stats/grpc_testing/test.proto
@@ -20,4 +20,10 @@
   // As one request could lead to multiple responses, this interface
   // demonstrates the idea of full duplexing.
   rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse);
+
+  // Client stream 
+  rpc ClientStreamCall(stream SimpleRequest) returns (SimpleResponse);
+
+  // Server stream
+  rpc ServerStreamCall(SimpleRequest) returns (stream SimpleResponse);
 }
diff --git a/stats/stats_test.go b/stats/stats_test.go
index 35d60a4..467d6a5 100644
--- a/stats/stats_test.go
+++ b/stats/stats_test.go
@@ -120,6 +120,51 @@
 	}
 }
 
+func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
+	md, ok := metadata.FromContext(stream.Context())
+	if ok {
+		if err := stream.SendHeader(md); err != nil {
+			return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
+		}
+		stream.SetTrailer(testTrailerMetadata)
+	}
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			// read done.
+			return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
+		}
+		if err != nil {
+			return err
+		}
+
+		if in.Id == errorID {
+			return fmt.Errorf("got error id: %v", in.Id)
+		}
+	}
+}
+
+func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
+	md, ok := metadata.FromContext(stream.Context())
+	if ok {
+		if err := stream.SendHeader(md); err != nil {
+			return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
+		}
+		stream.SetTrailer(testTrailerMetadata)
+	}
+
+	if in.Id == errorID {
+		return fmt.Errorf("got error id: %v", in.Id)
+	}
+
+	for i := 0; i < 5; i++ {
+		if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 // test is an end-to-end test. It should be created with the newTest
 // func, modified as needed, and then started with its startServer method.
 // It should be cleaned up with the tearDown method.
@@ -218,12 +263,21 @@
 	return te.cc
 }
 
+type rpcType int
+
+const (
+	unaryRPC rpcType = iota
+	clientStreamRPC
+	serverStreamRPC
+	fullDuplexStreamRPC
+)
+
 type rpcConfig struct {
 	count      int  // Number of requests and responses for streaming RPCs.
 	success    bool // Whether the RPC should succeed or return error.
 	failfast   bool
-	streaming  bool // Whether the rpc should be a streaming RPC.
-	noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
+	callType   rpcType // Type of RPC.
+	noLastRecv bool    // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
 }
 
 func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
@@ -289,6 +343,64 @@
 	return reqs, resps, nil
 }
 
+func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
+	var (
+		reqs []*testpb.SimpleRequest
+		resp *testpb.SimpleResponse
+		err  error
+	)
+	tc := testpb.NewTestServiceClient(te.clientConn())
+	stream, err := tc.ClientStreamCall(metadata.NewContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
+	if err != nil {
+		return reqs, resp, err
+	}
+	var startID int32
+	if !c.success {
+		startID = errorID
+	}
+	for i := 0; i < c.count; i++ {
+		req := &testpb.SimpleRequest{
+			Id: int32(i) + startID,
+		}
+		reqs = append(reqs, req)
+		if err = stream.Send(req); err != nil {
+			return reqs, resp, err
+		}
+	}
+	resp, err = stream.CloseAndRecv()
+	return reqs, resp, err
+}
+
+func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
+	var (
+		req   *testpb.SimpleRequest
+		resps []*testpb.SimpleResponse
+		err   error
+	)
+
+	tc := testpb.NewTestServiceClient(te.clientConn())
+
+	var startID int32
+	if !c.success {
+		startID = errorID
+	}
+	req = &testpb.SimpleRequest{Id: startID}
+	stream, err := tc.ServerStreamCall(metadata.NewContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
+	if err != nil {
+		return req, resps, err
+	}
+	for {
+		var resp *testpb.SimpleResponse
+		resp, err := stream.Recv()
+		if err == io.EOF {
+			return req, resps, nil
+		} else if err != nil {
+			return req, resps, err
+		}
+		resps = append(resps, resp)
+	}
+}
+
 type expectedData struct {
 	method      string
 	serverAddr  string
@@ -672,16 +784,35 @@
 	defer te.tearDown()
 
 	var (
-		reqs  []*testpb.SimpleRequest
-		resps []*testpb.SimpleResponse
-		err   error
+		reqs   []*testpb.SimpleRequest
+		resps  []*testpb.SimpleResponse
+		err    error
+		method string
+
+		req  *testpb.SimpleRequest
+		resp *testpb.SimpleResponse
+		e    error
 	)
-	if !cc.streaming {
-		req, resp, e := te.doUnaryCall(cc)
+
+	switch cc.callType {
+	case unaryRPC:
+		method = "/grpc.testing.TestService/UnaryCall"
+		req, resp, e = te.doUnaryCall(cc)
 		reqs = []*testpb.SimpleRequest{req}
 		resps = []*testpb.SimpleResponse{resp}
 		err = e
-	} else {
+	case clientStreamRPC:
+		method = "/grpc.testing.TestService/ClientStreamCall"
+		reqs, resp, e = te.doClientStreamCall(cc)
+		resps = []*testpb.SimpleResponse{resp}
+		err = e
+	case serverStreamRPC:
+		method = "/grpc.testing.TestService/ServerStreamCall"
+		req, resps, e = te.doServerStreamCall(cc)
+		reqs = []*testpb.SimpleRequest{req}
+		err = e
+	case fullDuplexStreamRPC:
+		method = "/grpc.testing.TestService/FullDuplexCall"
 		reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
 	}
 	if cc.success != (err == nil) {
@@ -713,22 +844,18 @@
 	expect := &expectedData{
 		serverAddr:  te.srvAddr,
 		compression: tc.compress,
+		method:      method,
 		requests:    reqs,
 		responses:   resps,
 		err:         err,
 	}
-	if !cc.streaming {
-		expect.method = "/grpc.testing.TestService/UnaryCall"
-	} else {
-		expect.method = "/grpc.testing.TestService/FullDuplexCall"
-	}
 
 	checkConnStats(t, h.gotConn)
 	checkServerStats(t, h.gotRPC, expect, checkFuncs)
 }
 
 func TestServerStatsUnaryRPC(t *testing.T) {
-	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){
+	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
 		checkInHeader,
 		checkBegin,
 		checkInPayload,
@@ -740,7 +867,7 @@
 }
 
 func TestServerStatsUnaryRPCError(t *testing.T) {
-	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){
+	testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
 		checkInHeader,
 		checkBegin,
 		checkInPayload,
@@ -750,7 +877,73 @@
 	})
 }
 
-func TestServerStatsStreamingRPC(t *testing.T) {
+func TestServerStatsClientStreamRPC(t *testing.T) {
+	count := 5
+	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+		checkInHeader,
+		checkBegin,
+		checkOutHeader,
+	}
+	ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+		checkInPayload,
+	}
+	for i := 0; i < count; i++ {
+		checkFuncs = append(checkFuncs, ioPayFuncs...)
+	}
+	checkFuncs = append(checkFuncs,
+		checkOutPayload,
+		checkOutTrailer,
+		checkEnd,
+	)
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
+}
+
+func TestServerStatsClientStreamRPCError(t *testing.T) {
+	count := 1
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
+		checkInHeader,
+		checkBegin,
+		checkOutHeader,
+		checkInPayload,
+		checkOutTrailer,
+		checkEnd,
+	})
+}
+
+func TestServerStatsServerStreamRPC(t *testing.T) {
+	count := 5
+	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+		checkInHeader,
+		checkBegin,
+		checkInPayload,
+		checkOutHeader,
+	}
+	ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
+		checkOutPayload,
+	}
+	for i := 0; i < count; i++ {
+		checkFuncs = append(checkFuncs, ioPayFuncs...)
+	}
+	checkFuncs = append(checkFuncs,
+		checkOutTrailer,
+		checkEnd,
+	)
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
+}
+
+func TestServerStatsServerStreamRPCError(t *testing.T) {
+	count := 5
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
+		checkInHeader,
+		checkBegin,
+		checkInPayload,
+		checkOutHeader,
+		checkOutTrailer,
+		checkEnd,
+	})
+}
+
+func TestServerStatsFullDuplexRPC(t *testing.T) {
 	count := 5
 	checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
 		checkInHeader,
@@ -768,12 +961,12 @@
 		checkOutTrailer,
 		checkEnd,
 	)
-	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs)
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
 }
 
-func TestServerStatsStreamingRPCError(t *testing.T) {
+func TestServerStatsFullDuplexRPCError(t *testing.T) {
 	count := 5
-	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){
+	testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
 		checkInHeader,
 		checkBegin,
 		checkOutHeader,
@@ -880,16 +1073,34 @@
 	defer te.tearDown()
 
 	var (
-		reqs  []*testpb.SimpleRequest
-		resps []*testpb.SimpleResponse
-		err   error
+		reqs   []*testpb.SimpleRequest
+		resps  []*testpb.SimpleResponse
+		method string
+		err    error
+
+		req  *testpb.SimpleRequest
+		resp *testpb.SimpleResponse
+		e    error
 	)
-	if !cc.streaming {
-		req, resp, e := te.doUnaryCall(cc)
+	switch cc.callType {
+	case unaryRPC:
+		method = "/grpc.testing.TestService/UnaryCall"
+		req, resp, e = te.doUnaryCall(cc)
 		reqs = []*testpb.SimpleRequest{req}
 		resps = []*testpb.SimpleResponse{resp}
 		err = e
-	} else {
+	case clientStreamRPC:
+		method = "/grpc.testing.TestService/ClientStreamCall"
+		reqs, resp, e = te.doClientStreamCall(cc)
+		resps = []*testpb.SimpleResponse{resp}
+		err = e
+	case serverStreamRPC:
+		method = "/grpc.testing.TestService/ServerStreamCall"
+		req, resps, e = te.doServerStreamCall(cc)
+		reqs = []*testpb.SimpleRequest{req}
+		err = e
+	case fullDuplexStreamRPC:
+		method = "/grpc.testing.TestService/FullDuplexCall"
 		reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
 	}
 	if cc.success != (err == nil) {
@@ -925,23 +1136,19 @@
 	expect := &expectedData{
 		serverAddr:  te.srvAddr,
 		compression: tc.compress,
+		method:      method,
 		requests:    reqs,
 		responses:   resps,
 		failfast:    cc.failfast,
 		err:         err,
 	}
-	if !cc.streaming {
-		expect.method = "/grpc.testing.TestService/UnaryCall"
-	} else {
-		expect.method = "/grpc.testing.TestService/FullDuplexCall"
-	}
 
 	checkConnStats(t, h.gotConn)
 	checkClientStats(t, h.gotRPC, expect, checkFuncs)
 }
 
 func TestClientStatsUnaryRPC(t *testing.T) {
-	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{
+	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
 		begin:      {checkBegin, 1},
 		outHeader:  {checkOutHeader, 1},
 		outPayload: {checkOutPayload, 1},
@@ -953,7 +1160,7 @@
 }
 
 func TestClientStatsUnaryRPCError(t *testing.T) {
-	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{
+	testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
 		begin:      {checkBegin, 1},
 		outHeader:  {checkOutHeader, 1},
 		outPayload: {checkOutPayload, 1},
@@ -963,23 +1170,59 @@
 	})
 }
 
-func TestClientStatsStreamingRPC(t *testing.T) {
+func TestClientStatsClientStreamRPC(t *testing.T) {
 	count := 5
-	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
 		begin:      {checkBegin, 1},
 		outHeader:  {checkOutHeader, 1},
-		outPayload: {checkOutPayload, count},
 		inHeader:   {checkInHeader, 1},
-		inPayload:  {checkInPayload, count},
+		outPayload: {checkOutPayload, count},
 		inTrailer:  {checkInTrailer, 1},
+		inPayload:  {checkInPayload, 1},
 		end:        {checkEnd, 1},
 	})
 }
 
-// If the user doesn't call the last recv() on clientSteam.
-func TestClientStatsStreamingRPCNotCallingLastRecv(t *testing.T) {
+func TestClientStatsClientStreamRPCError(t *testing.T) {
 	count := 1
-	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true, noLastRecv: true}, map[int]*checkFuncWithCount{
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
+		begin:      {checkBegin, 1},
+		outHeader:  {checkOutHeader, 1},
+		inHeader:   {checkInHeader, 1},
+		outPayload: {checkOutPayload, 1},
+		inTrailer:  {checkInTrailer, 1},
+		end:        {checkEnd, 1},
+	})
+}
+
+func TestClientStatsServerStreamRPC(t *testing.T) {
+	count := 5
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
+		begin:      {checkBegin, 1},
+		outHeader:  {checkOutHeader, 1},
+		outPayload: {checkOutPayload, 1},
+		inHeader:   {checkInHeader, 1},
+		inPayload:  {checkInPayload, count},
+		inTrailer:  {checkInTrailer, 1},
+		end:        {checkEnd, 1},
+	})
+}
+
+func TestClientStatsServerStreamRPCError(t *testing.T) {
+	count := 5
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
+		begin:      {checkBegin, 1},
+		outHeader:  {checkOutHeader, 1},
+		outPayload: {checkOutPayload, 1},
+		inHeader:   {checkInHeader, 1},
+		inTrailer:  {checkInTrailer, 1},
+		end:        {checkEnd, 1},
+	})
+}
+
+func TestClientStatsFullDuplexRPC(t *testing.T) {
+	count := 5
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
 		begin:      {checkBegin, 1},
 		outHeader:  {checkOutHeader, 1},
 		outPayload: {checkOutPayload, count},
@@ -990,9 +1233,9 @@
 	})
 }
 
-func TestClientStatsStreamingRPCError(t *testing.T) {
+func TestClientStatsFullDuplexRPCError(t *testing.T) {
 	count := 5
-	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
 		begin:      {checkBegin, 1},
 		outHeader:  {checkOutHeader, 1},
 		outPayload: {checkOutPayload, 1},
@@ -1001,3 +1244,17 @@
 		end:        {checkEnd, 1},
 	})
 }
+
+// If the user doesn't call the last recv() on clientStream.
+func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
+	count := 1
+	testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
+		begin:      {checkBegin, 1},
+		outHeader:  {checkOutHeader, 1},
+		outPayload: {checkOutPayload, count},
+		inHeader:   {checkInHeader, 1},
+		inPayload:  {checkInPayload, count},
+		inTrailer:  {checkInTrailer, 1},
+		end:        {checkEnd, 1},
+	})
+}