blob: 51ed977b8fea7530b5d12ee0a301ed05fd5e7219 [file] [log] [blame]
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example gRPC Python-using server-side application."""
import grpc
import threading
# requests_pb2 is a semantic dependency of this module.
from tests.testing import _application_common
from tests.testing.proto import requests_pb2 # pylint: disable=unused-import
from tests.testing.proto import services_pb2
from tests.testing.proto import services_pb2_grpc
class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
"""Services RPCs."""
def __init__(self):
self._abort_lock = threading.RLock()
self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE
def UnUn(self, request, context):
if request == _application_common.UNARY_UNARY_REQUEST:
return _application_common.UNARY_UNARY_RESPONSE
elif request == _application_common.ABORT_REQUEST:
with self._abort_lock:
try:
context.abort(grpc.StatusCode.PERMISSION_DENIED,
"Denying permission to test abort.")
except Exception as e: # pylint: disable=broad-except
self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE
else:
self._abort_status = _application_common.ABORT_FAILURE_RESPONSE
return None # NOTE: For the linter.
elif request == _application_common.ABORT_SUCCESS_QUERY:
with self._abort_lock:
return self._abort_response
else:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return services_pb2.Down()
def UnStre(self, request, context):
if _application_common.UNARY_STREAM_REQUEST != request:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return
yield services_pb2.Strange() # pylint: disable=unreachable
def StreUn(self, request_iterator, context):
context.send_initial_metadata(((
'server_application_metadata_key',
'Hi there!',
),))
for request in request_iterator:
if request != _application_common.STREAM_UNARY_REQUEST:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return services_pb2.Strange()
elif not context.is_active():
return services_pb2.Strange()
else:
return _application_common.STREAM_UNARY_RESPONSE
def StreStre(self, request_iterator, context):
valid_requests = (_application_common.STREAM_STREAM_REQUEST,
_application_common.STREAM_STREAM_MUTATING_REQUEST)
for request in request_iterator:
if request not in valid_requests:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return
elif not context.is_active():
return
elif request == _application_common.STREAM_STREAM_REQUEST:
yield _application_common.STREAM_STREAM_RESPONSE
yield _application_common.STREAM_STREAM_RESPONSE
elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST:
response = services_pb2.Bottom()
for i in range(
_application_common.STREAM_STREAM_MUTATING_COUNT):
response.first_bottom_field = i
yield response