Add started RPC metric for client and server side (#1283)
* Add started RPC metric for client and server side
diff --git a/plugin/ocgrpc/client_metrics.go b/plugin/ocgrpc/client_metrics.go
index 49fde3d..fb3c19d 100644
--- a/plugin/ocgrpc/client_metrics.go
+++ b/plugin/ocgrpc/client_metrics.go
@@ -28,6 +28,7 @@
ClientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
ClientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes)
ClientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
+ ClientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "Number of started client RPCs.", stats.UnitDimensionless)
ClientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
)
@@ -70,6 +71,14 @@
Aggregation: view.Count(),
}
+ ClientStartedRPCsView = &view.View{
+ Measure: ClientStartedRPCs,
+ Name: "grpc.io/client/started_rpcs",
+ Description: "Number of started client RPCs.",
+ TagKeys: []tag.Key{KeyClientMethod},
+ Aggregation: view.Count(),
+ }
+
ClientSentMessagesPerRPCView = &view.View{
Measure: ClientSentMessagesPerRPC,
Name: "grpc.io/client/sent_messages_per_rpc",
diff --git a/plugin/ocgrpc/end_to_end_test.go b/plugin/ocgrpc/end_to_end_test.go
index 8715079..d106f43 100644
--- a/plugin/ocgrpc/end_to_end_test.go
+++ b/plugin/ocgrpc/end_to_end_test.go
@@ -40,6 +40,8 @@
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
+ ocgrpc.ServerStartedRPCsView,
+ ocgrpc.ClientStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
@@ -63,10 +65,14 @@
if err != nil {
t.Fatal(err)
}
+ checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
+ checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)
_, _ = client.Single(ctx, &testpb.FooRequest{Fail: true})
+ checkCount(t, ocgrpc.ClientStartedRPCsView, 2, clientMethodTag)
+ checkCount(t, ocgrpc.ServerStartedRPCsView, 2, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, serverStatusUnknownTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, clientStatusUnknownTag)
@@ -101,6 +107,7 @@
func TestEndToEnd_Stream(t *testing.T) {
view.Register(ocgrpc.DefaultClientViews...)
defer view.Unregister(ocgrpc.DefaultClientViews...)
+
view.Register(ocgrpc.DefaultServerViews...)
defer view.Unregister(ocgrpc.DefaultServerViews...)
@@ -109,6 +116,8 @@
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
+ ocgrpc.ClientStartedRPCsView,
+ ocgrpc.ServerStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
@@ -146,6 +155,8 @@
t.Fatal(err)
}
+ checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
+ checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)
@@ -183,6 +194,7 @@
return 0, false
}
}
+
rows, err := view.RetrieveData(v.Name)
if err != nil {
t.Fatal(err)
diff --git a/plugin/ocgrpc/server_metrics.go b/plugin/ocgrpc/server_metrics.go
index b205982..fe0e971 100644
--- a/plugin/ocgrpc/server_metrics.go
+++ b/plugin/ocgrpc/server_metrics.go
@@ -27,6 +27,7 @@
ServerReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes)
ServerSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless)
ServerSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes)
+ ServerStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "Number of started server RPCs.", stats.UnitDimensionless)
ServerLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds)
)
@@ -73,6 +74,14 @@
Aggregation: view.Count(),
}
+ ServerStartedRPCsView = &view.View{
+ Measure: ServerStartedRPCs,
+ Name: "grpc.io/server/started_rpcs",
+ Description: "Number of started server RPCs.",
+ TagKeys: []tag.Key{KeyServerMethod},
+ Aggregation: view.Count(),
+ }
+
ServerReceivedMessagesPerRPCView = &view.View{
Name: "grpc.io/server/received_messages_per_rpc",
Description: "Distribution of messages received count per RPC, by method.",
diff --git a/plugin/ocgrpc/stats_common.go b/plugin/ocgrpc/stats_common.go
index 89cac9c..9cb2732 100644
--- a/plugin/ocgrpc/stats_common.go
+++ b/plugin/ocgrpc/stats_common.go
@@ -82,8 +82,10 @@
// statsHandleRPC processes the RPC events.
func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
switch st := s.(type) {
- case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
+ case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
// do nothing for client
+ case *stats.Begin:
+ handleRPCBegin(ctx, st)
case *stats.OutPayload:
handleRPCOutPayload(ctx, st)
case *stats.InPayload:
@@ -95,6 +97,25 @@
}
}
+func handleRPCBegin(ctx context.Context, s *stats.Begin) {
+ d, ok := ctx.Value(rpcDataKey).(*rpcData)
+ if !ok {
+ if grpclog.V(2) {
+ grpclog.Infoln("Failed to retrieve *rpcData from context.")
+ }
+ }
+
+ if s.IsClient() {
+ ocstats.RecordWithOptions(ctx,
+ ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
+ ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
+ } else {
+ ocstats.RecordWithOptions(ctx,
+ ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
+ ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
+ }
+}
+
func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
d, ok := ctx.Value(rpcDataKey).(*rpcData)
if !ok {
diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go
index fe8e35f..af81fbb 100644
--- a/stats/view/worker_test.go
+++ b/stats/view/worker_test.go
@@ -180,14 +180,14 @@
// Format is Resource.Labels encoded as string, then
wantPartialData := map[string][]*Row{
- makeKey(nil, count.Name): []*Row{
+ makeKey(nil, count.Name): {
{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
- makeKey(nil, sum.Name): []*Row{
+ makeKey(nil, sum.Name): {
{nil, &SumData{Value: 7.5}},
},
- makeKey(&extraResource, count.Name): []*Row{
+ makeKey(&extraResource, count.Name): {
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
}