blob: b47e04c718666bd8fcef21de39f5f25774179746 [file] [log] [blame]
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import threading
import grpc
from grpc_testing import _common
_CLIENT_INACTIVE = object()
class Handler(_common.ServerRpcHandler):
@abc.abstractmethod
def initial_metadata(self):
raise NotImplementedError()
@abc.abstractmethod
def add_request(self, request):
raise NotImplementedError()
@abc.abstractmethod
def take_response(self):
raise NotImplementedError()
@abc.abstractmethod
def requests_closed(self):
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
raise NotImplementedError()
@abc.abstractmethod
def unary_response_termination(self):
raise NotImplementedError()
@abc.abstractmethod
def stream_response_termination(self):
raise NotImplementedError()
class _Handler(Handler):
def __init__(self, requests_closed):
self._condition = threading.Condition()
self._requests = []
self._requests_closed = requests_closed
self._initial_metadata = None
self._responses = []
self._trailing_metadata = None
self._code = None
self._details = None
self._unary_response = None
self._expiration_future = None
self._termination_callbacks = []
def send_initial_metadata(self, initial_metadata):
with self._condition:
self._initial_metadata = initial_metadata
self._condition.notify_all()
def take_request(self):
with self._condition:
while True:
if self._code is None:
if self._requests:
request = self._requests.pop(0)
self._condition.notify_all()
return _common.ServerRpcRead(request, False, False)
elif self._requests_closed:
return _common.REQUESTS_CLOSED
else:
self._condition.wait()
else:
return _common.TERMINATED
def is_active(self):
with self._condition:
return self._code is None
def add_response(self, response):
with self._condition:
self._responses.append(response)
self._condition.notify_all()
def send_termination(self, trailing_metadata, code, details):
with self._condition:
self._trailing_metadata = trailing_metadata
self._code = code
self._details = details
if self._expiration_future is not None:
self._expiration_future.cancel()
self._condition.notify_all()
def add_termination_callback(self, termination_callback):
with self._condition:
if self._code is None:
self._termination_callbacks.append(termination_callback)
return True
else:
return False
def initial_metadata(self):
with self._condition:
while True:
if self._initial_metadata is None:
if self._code is None:
self._condition.wait()
else:
raise ValueError(
'No initial metadata despite status code!')
else:
return self._initial_metadata
def add_request(self, request):
with self._condition:
self._requests.append(request)
self._condition.notify_all()
def take_response(self):
with self._condition:
while True:
if self._responses:
response = self._responses.pop(0)
self._condition.notify_all()
return response
elif self._code is None:
self._condition.wait()
else:
raise ValueError('No more responses!')
def requests_closed(self):
with self._condition:
self._requests_closed = True
self._condition.notify_all()
def cancel(self):
with self._condition:
if self._code is None:
self._code = _CLIENT_INACTIVE
termination_callbacks = self._termination_callbacks
self._termination_callbacks = None
if self._expiration_future is not None:
self._expiration_future.cancel()
self._condition.notify_all()
for termination_callback in termination_callbacks:
termination_callback()
def unary_response_termination(self):
with self._condition:
while True:
if self._code is _CLIENT_INACTIVE:
raise ValueError('Huh? Cancelled but wanting status?')
elif self._code is None:
self._condition.wait()
else:
if self._unary_response is None:
if self._responses:
self._unary_response = self._responses.pop(0)
return (
self._unary_response, self._trailing_metadata,
self._code, self._details,)
def stream_response_termination(self):
with self._condition:
while True:
if self._code is _CLIENT_INACTIVE:
raise ValueError('Huh? Cancelled but wanting status?')
elif self._code is None:
self._condition.wait()
else:
return self._trailing_metadata, self._code, self._details,
def expire(self):
with self._condition:
if self._code is None:
if self._initial_metadata is None:
self._initial_metadata = _common.FUSSED_EMPTY_METADATA
self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
self._code = grpc.StatusCode.DEADLINE_EXCEEDED
self._details = 'Took too much time!'
termination_callbacks = self._termination_callbacks
self._termination_callbacks = None
self._condition.notify_all()
for termination_callback in termination_callbacks:
termination_callback()
def set_expiration_future(self, expiration_future):
with self._condition:
self._expiration_future = expiration_future
def handler_without_deadline(requests_closed):
return _Handler(requests_closed)
def handler_with_deadline(requests_closed, time, deadline):
handler = _Handler(requests_closed)
expiration_future = time.call_at(handler.expire, deadline)
handler.set_expiration_future(expiration_future)
return handler