| # Copyright 2024 The gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import argparse |
| from concurrent import futures |
| import logging |
| import socket |
| |
| import grpc |
| from grpc_csm_observability import CsmOpenTelemetryPlugin |
| from opentelemetry.exporter.prometheus import PrometheusMetricReader |
| from opentelemetry.sdk.metrics import MeterProvider |
| from prometheus_client import start_http_server |
| |
| from src.proto.grpc.testing import messages_pb2 |
| from src.proto.grpc.testing import test_pb2_grpc |
| |
| _LISTEN_HOST = "0.0.0.0" |
| _THREAD_POOL_SIZE = 256 |
| |
| logger = logging.getLogger() |
| console_handler = logging.StreamHandler() |
| formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") |
| console_handler.setFormatter(formatter) |
| logger.addHandler(console_handler) |
| |
| |
| class TestService(test_pb2_grpc.TestServiceServicer): |
| def __init__(self, server_id, hostname): |
| self._server_id = server_id |
| self._hostname = hostname |
| |
| def UnaryCall( |
| self, request: messages_pb2.SimpleRequest, context: grpc.ServicerContext |
| ) -> messages_pb2.SimpleResponse: |
| context.send_initial_metadata((("hostname", self._hostname),)) |
| if request.response_size > 0: |
| response = messages_pb2.SimpleResponse( |
| payload=messages_pb2.Payload(body=b"0" * request.response_size) |
| ) |
| else: |
| response = messages_pb2.SimpleResponse() |
| response.server_id = self._server_id |
| response.hostname = self._hostname |
| logger.info("Sending response to client") |
| return response |
| |
| |
| def _run( |
| port: int, |
| secure_mode: bool, |
| server_id: str, |
| prometheus_endpoint: int, |
| ) -> None: |
| csm_plugin = _prepare_csm_observability_plugin(prometheus_endpoint) |
| csm_plugin.register_global() |
| server = grpc.server( |
| futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE) |
| ) |
| _configure_test_server(server, port, secure_mode, server_id) |
| server.start() |
| logger.info("Test server listening on port %d", port) |
| server.wait_for_termination() |
| csm_plugin.deregister_global() |
| |
| |
| def _prepare_csm_observability_plugin( |
| prometheus_endpoint: int, |
| ) -> CsmOpenTelemetryPlugin: |
| # Start Prometheus client |
| start_http_server(port=prometheus_endpoint, addr="0.0.0.0") |
| reader = PrometheusMetricReader() |
| meter_provider = MeterProvider(metric_readers=[reader]) |
| csm_plugin = CsmOpenTelemetryPlugin( |
| meter_provider=meter_provider, |
| ) |
| return csm_plugin |
| |
| |
| def _configure_test_server( |
| server: grpc.Server, port: int, secure_mode: bool, server_id: str |
| ) -> None: |
| test_pb2_grpc.add_TestServiceServicer_to_server( |
| TestService(server_id, socket.gethostname()), server |
| ) |
| listen_address = f"{_LISTEN_HOST}:{port}" |
| if not secure_mode: |
| server.add_insecure_port(listen_address) |
| else: |
| logger.info("Running with xDS Server credentials") |
| server_fallback_creds = grpc.insecure_server_credentials() |
| server_creds = grpc.xds_server_credentials(server_fallback_creds) |
| server.add_secure_port(listen_address, server_creds) |
| |
| |
| def bool_arg(arg: str) -> bool: |
| if arg.lower() in ("true", "yes", "y"): |
| return True |
| elif arg.lower() in ("false", "no", "n"): |
| return False |
| else: |
| raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") |
| |
| |
| if __name__ == "__main__": |
| logging.basicConfig() |
| logger.setLevel(logging.INFO) |
| parser = argparse.ArgumentParser( |
| description="Run Python CSM Observability Test server." |
| ) |
| parser.add_argument( |
| "--port", type=int, default=50051, help="Port for test server." |
| ) |
| parser.add_argument( |
| "--secure_mode", |
| type=bool_arg, |
| default="False", |
| help="If specified, uses xDS to retrieve server credentials.", |
| ) |
| parser.add_argument( |
| "--server_id", |
| type=str, |
| default="python_server", |
| help="The server ID to return in responses.", |
| ) |
| parser.add_argument( |
| "--prometheus_endpoint", |
| type=int, |
| default=9464, |
| help="Port for servers besides test server.", |
| ) |
| args = parser.parse_args() |
| _run( |
| args.port, |
| args.secure_mode, |
| args.server_id, |
| args.prometheus_endpoint, |
| ) |