Add started RPC metric for client and server side (#1283)

* Add started RPC metric for client and server side
diff --git a/plugin/ocgrpc/client_metrics.go b/plugin/ocgrpc/client_metrics.go
index 49fde3d..fb3c19d 100644
--- a/plugin/ocgrpc/client_metrics.go
+++ b/plugin/ocgrpc/client_metrics.go
@@ -28,6 +28,7 @@
 	ClientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
 	ClientReceivedBytesPerRPC    = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes)
 	ClientRoundtripLatency       = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
+	ClientStartedRPCs            = stats.Int64("grpc.io/client/started_rpcs", "Number of started client RPCs.", stats.UnitDimensionless)
 	ClientServerLatency          = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
 )
 
@@ -70,6 +71,14 @@
 		Aggregation: view.Count(),
 	}
 
+	ClientStartedRPCsView = &view.View{
+		Measure:     ClientStartedRPCs,
+		Name:        "grpc.io/client/started_rpcs",
+		Description: "Number of started client RPCs.",
+		TagKeys:     []tag.Key{KeyClientMethod},
+		Aggregation: view.Count(),
+	}
+
 	ClientSentMessagesPerRPCView = &view.View{
 		Measure:     ClientSentMessagesPerRPC,
 		Name:        "grpc.io/client/sent_messages_per_rpc",
diff --git a/plugin/ocgrpc/end_to_end_test.go b/plugin/ocgrpc/end_to_end_test.go
index 8715079..d106f43 100644
--- a/plugin/ocgrpc/end_to_end_test.go
+++ b/plugin/ocgrpc/end_to_end_test.go
@@ -40,6 +40,8 @@
 		ocgrpc.ClientReceivedMessagesPerRPCView,
 		ocgrpc.ServerSentMessagesPerRPCView,
 		ocgrpc.ClientSentMessagesPerRPCView,
+		ocgrpc.ServerStartedRPCsView,
+		ocgrpc.ClientStartedRPCsView,
 	}
 	view.Register(extraViews...)
 	defer view.Unregister(extraViews...)
@@ -63,10 +65,14 @@
 	if err != nil {
 		t.Fatal(err)
 	}
+	checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
+	checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
 	checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
 	checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)
 
 	_, _ = client.Single(ctx, &testpb.FooRequest{Fail: true})
+	checkCount(t, ocgrpc.ClientStartedRPCsView, 2, clientMethodTag)
+	checkCount(t, ocgrpc.ServerStartedRPCsView, 2, serverMethodTag)
 	checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, serverStatusUnknownTag)
 	checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, clientStatusUnknownTag)
 
@@ -101,6 +107,7 @@
 func TestEndToEnd_Stream(t *testing.T) {
 	view.Register(ocgrpc.DefaultClientViews...)
 	defer view.Unregister(ocgrpc.DefaultClientViews...)
+
 	view.Register(ocgrpc.DefaultServerViews...)
 	defer view.Unregister(ocgrpc.DefaultServerViews...)
 
@@ -109,6 +116,8 @@
 		ocgrpc.ClientReceivedMessagesPerRPCView,
 		ocgrpc.ServerSentMessagesPerRPCView,
 		ocgrpc.ClientSentMessagesPerRPCView,
+		ocgrpc.ClientStartedRPCsView,
+		ocgrpc.ServerStartedRPCsView,
 	}
 	view.Register(extraViews...)
 	defer view.Unregister(extraViews...)
@@ -146,6 +155,8 @@
 		t.Fatal(err)
 	}
 
+	checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
+	checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
 	checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
 	checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)
 
@@ -183,6 +194,7 @@
 			return 0, false
 		}
 	}
+
 	rows, err := view.RetrieveData(v.Name)
 	if err != nil {
 		t.Fatal(err)
diff --git a/plugin/ocgrpc/server_metrics.go b/plugin/ocgrpc/server_metrics.go
index b205982..fe0e971 100644
--- a/plugin/ocgrpc/server_metrics.go
+++ b/plugin/ocgrpc/server_metrics.go
@@ -27,6 +27,7 @@
 	ServerReceivedBytesPerRPC    = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes)
 	ServerSentMessagesPerRPC     = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless)
 	ServerSentBytesPerRPC        = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes)
+	ServerStartedRPCs            = stats.Int64("grpc.io/server/started_rpcs", "Number of started server RPCs.", stats.UnitDimensionless)
 	ServerLatency                = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds)
 )
 
@@ -73,6 +74,14 @@
 		Aggregation: view.Count(),
 	}
 
+	ServerStartedRPCsView = &view.View{
+		Measure:     ServerStartedRPCs,
+		Name:        "grpc.io/server/started_rpcs",
+		Description: "Number of started server RPCs.",
+		TagKeys:     []tag.Key{KeyServerMethod},
+		Aggregation: view.Count(),
+	}
+
 	ServerReceivedMessagesPerRPCView = &view.View{
 		Name:        "grpc.io/server/received_messages_per_rpc",
 		Description: "Distribution of messages received count per RPC, by method.",
diff --git a/plugin/ocgrpc/stats_common.go b/plugin/ocgrpc/stats_common.go
index 89cac9c..9cb2732 100644
--- a/plugin/ocgrpc/stats_common.go
+++ b/plugin/ocgrpc/stats_common.go
@@ -82,8 +82,10 @@
 // statsHandleRPC processes the RPC events.
 func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
 	switch st := s.(type) {
-	case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
+	case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
 		// do nothing for client
+	case *stats.Begin:
+		handleRPCBegin(ctx, st)
 	case *stats.OutPayload:
 		handleRPCOutPayload(ctx, st)
 	case *stats.InPayload:
@@ -95,6 +97,25 @@
 	}
 }
 
+func handleRPCBegin(ctx context.Context, s *stats.Begin) {
+	d, ok := ctx.Value(rpcDataKey).(*rpcData)
+	if !ok {
+		if grpclog.V(2) {
+			grpclog.Infoln("Failed to retrieve *rpcData from context.")
+		}
+	}
+
+	if s.IsClient() {
+		ocstats.RecordWithOptions(ctx,
+			ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
+			ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
+	} else {
+		ocstats.RecordWithOptions(ctx,
+			ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
+			ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
+	}
+}
+
 func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
 	d, ok := ctx.Value(rpcDataKey).(*rpcData)
 	if !ok {
diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go
index fe8e35f..af81fbb 100644
--- a/stats/view/worker_test.go
+++ b/stats/view/worker_test.go
@@ -180,14 +180,14 @@
 
 	// Format is Resource.Labels encoded as string, then
 	wantPartialData := map[string][]*Row{
-		makeKey(nil, count.Name): []*Row{
+		makeKey(nil, count.Name): {
 			{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
 			{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
 		},
-		makeKey(nil, sum.Name): []*Row{
+		makeKey(nil, sum.Name): {
 			{nil, &SumData{Value: 7.5}},
 		},
-		makeKey(&extraResource, count.Name): []*Row{
+		makeKey(&extraResource, count.Name): {
 			{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
 		},
 	}