| # Copyright 2017 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import abc |
| import threading |
| |
| import grpc |
| from grpc_testing import _common |
| |
| _CLIENT_INACTIVE = object() |
| |
| |
| class Handler(_common.ServerRpcHandler): |
| |
| @abc.abstractmethod |
| def initial_metadata(self): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def add_request(self, request): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def take_response(self): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def requests_closed(self): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def cancel(self): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def unary_response_termination(self): |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def stream_response_termination(self): |
| raise NotImplementedError() |
| |
| |
| class _Handler(Handler): |
| |
| def __init__(self, requests_closed): |
| self._condition = threading.Condition() |
| self._requests = [] |
| self._requests_closed = requests_closed |
| self._initial_metadata = None |
| self._responses = [] |
| self._trailing_metadata = None |
| self._code = None |
| self._details = None |
| self._unary_response = None |
| self._expiration_future = None |
| self._termination_callbacks = [] |
| |
| def send_initial_metadata(self, initial_metadata): |
| with self._condition: |
| self._initial_metadata = initial_metadata |
| self._condition.notify_all() |
| |
| def take_request(self): |
| with self._condition: |
| while True: |
| if self._code is None: |
| if self._requests: |
| request = self._requests.pop(0) |
| self._condition.notify_all() |
| return _common.ServerRpcRead(request, False, False) |
| elif self._requests_closed: |
| return _common.REQUESTS_CLOSED |
| else: |
| self._condition.wait() |
| else: |
| return _common.TERMINATED |
| |
| def is_active(self): |
| with self._condition: |
| return self._code is None |
| |
| def add_response(self, response): |
| with self._condition: |
| self._responses.append(response) |
| self._condition.notify_all() |
| |
| def send_termination(self, trailing_metadata, code, details): |
| with self._condition: |
| self._trailing_metadata = trailing_metadata |
| self._code = code |
| self._details = details |
| if self._expiration_future is not None: |
| self._expiration_future.cancel() |
| self._condition.notify_all() |
| |
| def add_termination_callback(self, callback): |
| with self._condition: |
| if self._code is None: |
| self._termination_callbacks.append(callback) |
| return True |
| else: |
| return False |
| |
| def initial_metadata(self): |
| with self._condition: |
| while True: |
| if self._initial_metadata is None: |
| if self._code is None: |
| self._condition.wait() |
| else: |
| raise ValueError( |
| 'No initial metadata despite status code!') |
| else: |
| return self._initial_metadata |
| |
| def add_request(self, request): |
| with self._condition: |
| self._requests.append(request) |
| self._condition.notify_all() |
| |
| def take_response(self): |
| with self._condition: |
| while True: |
| if self._responses: |
| response = self._responses.pop(0) |
| self._condition.notify_all() |
| return response |
| elif self._code is None: |
| self._condition.wait() |
| else: |
| raise ValueError('No more responses!') |
| |
| def requests_closed(self): |
| with self._condition: |
| self._requests_closed = True |
| self._condition.notify_all() |
| |
| def cancel(self): |
| with self._condition: |
| if self._code is None: |
| self._code = _CLIENT_INACTIVE |
| termination_callbacks = self._termination_callbacks |
| self._termination_callbacks = None |
| if self._expiration_future is not None: |
| self._expiration_future.cancel() |
| self._condition.notify_all() |
| for termination_callback in termination_callbacks: |
| termination_callback() |
| |
| def unary_response_termination(self): |
| with self._condition: |
| while True: |
| if self._code is _CLIENT_INACTIVE: |
| raise ValueError('Huh? Cancelled but wanting status?') |
| elif self._code is None: |
| self._condition.wait() |
| else: |
| if self._unary_response is None: |
| if self._responses: |
| self._unary_response = self._responses.pop(0) |
| return ( |
| self._unary_response, |
| self._trailing_metadata, |
| self._code, |
| self._details, |
| ) |
| |
| def stream_response_termination(self): |
| with self._condition: |
| while True: |
| if self._code is _CLIENT_INACTIVE: |
| raise ValueError('Huh? Cancelled but wanting status?') |
| elif self._code is None: |
| self._condition.wait() |
| else: |
| return self._trailing_metadata, self._code, self._details |
| |
| def expire(self): |
| with self._condition: |
| if self._code is None: |
| if self._initial_metadata is None: |
| self._initial_metadata = _common.FUSSED_EMPTY_METADATA |
| self._trailing_metadata = _common.FUSSED_EMPTY_METADATA |
| self._code = grpc.StatusCode.DEADLINE_EXCEEDED |
| self._details = 'Took too much time!' |
| termination_callbacks = self._termination_callbacks |
| self._termination_callbacks = None |
| self._condition.notify_all() |
| for termination_callback in termination_callbacks: |
| termination_callback() |
| |
| def set_expiration_future(self, expiration_future): |
| with self._condition: |
| self._expiration_future = expiration_future |
| |
| |
| def handler_without_deadline(requests_closed): |
| return _Handler(requests_closed) |
| |
| |
| def handler_with_deadline(requests_closed, time, deadline): |
| handler = _Handler(requests_closed) |
| expiration_future = time.call_at(handler.expire, deadline) |
| handler.set_expiration_future(expiration_future) |
| return handler |