blob: 4a6f6240e10b56b105e587e215c96838d19a827d [file] [log] [blame]
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Mapping
from grpc_observability import _measures
from grpc_observability._cyobservability import MetricsName
from opencensus.stats import aggregation
from opencensus.stats import view as view_module
from opencensus.tags.tag_key import TagKey
METRICS_NAME_TO_MEASURE = {
MetricsName.CLIENT_STARTED_RPCS: _measures.CLIENT_STARTED_RPCS_MEASURE,
MetricsName.CLIENT_ROUNDTRIP_LATENCY: _measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE,
MetricsName.CLIENT_COMPLETED_RPC: _measures.CLIENT_COMPLETED_RPCS_MEASURE,
MetricsName.CLIENT_API_LATENCY: _measures.CLIENT_API_LATENCY_MEASURE,
MetricsName.CLIENT_SEND_BYTES_PER_RPC: _measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE,
MetricsName.CLIENT_RECEIVED_BYTES_PER_RPC: _measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE,
MetricsName.SERVER_STARTED_RPCS: _measures.SERVER_STARTED_RPCS_MEASURE,
MetricsName.SERVER_SENT_BYTES_PER_RPC: _measures.SERVER_SENT_BYTES_PER_RPC_MEASURE,
MetricsName.SERVER_RECEIVED_BYTES_PER_RPC: _measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE,
MetricsName.SERVER_SERVER_LATENCY: _measures.SERVER_SERVER_LATENCY_MEASURE,
MetricsName.SERVER_COMPLETED_RPC: _measures.SERVER_COMPLETED_RPCS_MEASURE,
}
# These measure definitions should be kept in sync across opencensus
# implementations--see
# https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java.
def client_method_tag_key():
return TagKey("grpc_client_method")
def client_status_tag_key():
return TagKey("grpc_client_status")
def server_method_tag_key():
return TagKey("grpc_server_method")
def server_status_tag_key():
return TagKey("server_status_tag_key")
def count_distribution_aggregation() -> aggregation.DistributionAggregation:
exponential_boundaries = _get_exponential_boundaries(17, 1.0, 2.0)
return aggregation.DistributionAggregation(exponential_boundaries)
def bytes_distribution_aggregation() -> aggregation.DistributionAggregation:
return aggregation.DistributionAggregation(
[
1024,
2048,
4096,
16384,
65536,
262144,
1048576,
4194304,
16777216,
67108864,
268435456,
1073741824,
4294967296,
]
)
def millis_distribution_aggregation() -> aggregation.DistributionAggregation:
return aggregation.DistributionAggregation(
[
0.01,
0.05,
0.1,
0.3,
0.6,
0.8,
1,
2,
3,
4,
5,
6,
8,
10,
13,
16,
20,
25,
30,
40,
50,
65,
80,
100,
130,
160,
200,
250,
300,
400,
500,
650,
800,
1000,
2000,
5000,
10000,
20000,
50000,
100000,
]
)
# Client
def client_started_rpcs(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/client/started_rpcs",
"The count of RPCs ever received at the server, including RPCs"
+ " that have not completed.",
[TagKey(key) for key in labels.keys()] + [client_method_tag_key()],
_measures.CLIENT_STARTED_RPCS_MEASURE,
aggregation.CountAggregation(),
)
return view
def client_completed_rpcs(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/client/completed_rpcs",
"The total count of RPCs completed, for example, when a response"
+ " is sent by the server.",
[TagKey(key) for key in labels.keys()]
+ [client_method_tag_key(), client_status_tag_key()],
_measures.CLIENT_COMPLETED_RPCS_MEASURE,
aggregation.CountAggregation(),
)
return view
def client_roundtrip_latency(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/client/roundtrip_latency",
"End-to-end time taken to complete an RPC attempt including the time"
+ " it takes to pick a subchannel.",
[TagKey(key) for key in labels.keys()] + [client_method_tag_key()],
_measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE,
millis_distribution_aggregation(),
)
return view
def client_api_latency(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/client/api_latency",
"The total time taken by the gRPC library to complete an RPC from"
+ " the application's perspective.",
[TagKey(key) for key in labels.keys()]
+ [client_method_tag_key(), client_status_tag_key()],
_measures.CLIENT_API_LATENCY_MEASURE,
millis_distribution_aggregation(),
)
return view
def client_sent_compressed_message_bytes_per_rpc(
labels: Mapping[str, str]
) -> view_module.View:
view = view_module.View(
"grpc.io/client/sent_compressed_message_bytes_per_rpc",
"The total bytes (compressed, not encrypted) sent across all"
+ " request messages per RPC attempt.",
[TagKey(key) for key in labels.keys()]
+ [client_method_tag_key(), client_status_tag_key()],
_measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE,
bytes_distribution_aggregation(),
)
return view
def client_received_compressed_message_bytes_per_rpc(
labels: Mapping[str, str]
) -> view_module.View:
view = view_module.View(
"grpc.io/client/received_compressed_message_bytes_per_rpc",
"The total bytes (compressed, not encrypted) received across"
+ " all response messages per RPC attempt.",
[TagKey(key) for key in labels.keys()]
+ [client_method_tag_key(), client_status_tag_key()],
_measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE,
bytes_distribution_aggregation(),
)
return view
# Server
def server_started_rpcs(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/server/started_rpcs",
"The count of RPCs ever received at the server, including RPCs"
+ " that have not completed.",
[TagKey(key) for key in labels.keys()] + [server_method_tag_key()],
_measures.SERVER_STARTED_RPCS_MEASURE,
aggregation.CountAggregation(),
)
return view
def server_completed_rpcs(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/server/completed_rpcs",
"The total count of RPCs completed, for example, when a response"
+ " is sent by the server.",
[TagKey(key) for key in labels.keys()]
+ [server_method_tag_key(), server_status_tag_key()],
_measures.SERVER_COMPLETED_RPCS_MEASURE,
aggregation.CountAggregation(),
)
return view
def server_sent_compressed_message_bytes_per_rpc(
labels: Mapping[str, str]
) -> view_module.View:
view = view_module.View(
"grpc.io/server/sent_compressed_message_bytes_per_rpc",
"The total bytes (compressed not encrypted) sent across all response"
+ " messages per RPC.",
[TagKey(key) for key in labels.keys()]
+ [server_method_tag_key(), server_status_tag_key()],
_measures.SERVER_SENT_BYTES_PER_RPC_MEASURE,
bytes_distribution_aggregation(),
)
return view
def server_received_compressed_message_bytes_per_rpc(
labels: Mapping[str, str]
) -> view_module.View:
view = view_module.View(
"grpc.io/server/received_compressed_message_bytes_per_rpc",
"The total bytes (compressed not encrypted) received across all"
+ " request messages per RPC.",
[TagKey(key) for key in labels.keys()]
+ [server_method_tag_key(), server_status_tag_key()],
_measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE,
bytes_distribution_aggregation(),
)
return view
def server_server_latency(labels: Mapping[str, str]) -> view_module.View:
view = view_module.View(
"grpc.io/server/server_latency",
"The total time taken by an RPC from server transport's"
+ " (HTTP2 / inproc / cronet) perspective.",
[TagKey(key) for key in labels.keys()]
+ [server_method_tag_key(), server_status_tag_key()],
_measures.SERVER_SERVER_LATENCY_MEASURE,
millis_distribution_aggregation(),
)
return view
def _get_exponential_boundaries(
num_finite_buckets: int, scale: float, grrowth_factor: float
) -> list:
boundaries = []
upper_bound = scale
for _ in range(num_finite_buckets):
boundaries.append(upper_bound)
upper_bound *= grrowth_factor
return boundaries