blob: 1098d38c83e357eda115fd227350e5f46f538830 [file] [log] [blame]
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of grpc_health.v1.health."""
import threading
import time
import unittest
import grpc
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from tests.unit import test_common
from tests.unit import thread_pool
from tests.unit.framework.common import test_constants
from six.moves import queue
_SERVING_SERVICE = 'grpc.test.TestServiceServing'
_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
_WATCH_SERVICE = 'grpc.test.WatchService'
def _consume_responses(response_iterator, response_queue):
for response in response_iterator:
response_queue.put(response)
class BaseWatchTests(object):
class WatchTests(unittest.TestCase):
def start_server(self, non_blocking=False, thread_pool=None):
self._thread_pool = thread_pool
self._servicer = health.HealthServicer(
experimental_non_blocking=non_blocking,
experimental_thread_pool=thread_pool)
self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
self._servicer.set(_SERVING_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
self._servicer.set(_UNKNOWN_SERVICE,
health_pb2.HealthCheckResponse.UNKNOWN)
self._servicer.set(_NOT_SERVING_SERVICE,
health_pb2.HealthCheckResponse.NOT_SERVING)
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
health_pb2_grpc.add_HealthServicer_to_server(
self._servicer, self._server)
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
self._stub = health_pb2_grpc.HealthStub(self._channel)
def tearDown(self):
self._server.stop(None)
self._channel.close()
def test_watch_empty_service(self):
request = health_pb2.HealthCheckRequest(service='')
response_queue = queue.Queue()
rendezvous = self._stub.Watch(request)
thread = threading.Thread(
target=_consume_responses, args=(rendezvous, response_queue))
thread.start()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
response.status)
rendezvous.cancel()
thread.join()
self.assertTrue(response_queue.empty())
if self._thread_pool is not None:
self.assertTrue(self._thread_pool.was_used())
def test_watch_new_service(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
response_queue = queue.Queue()
rendezvous = self._stub.Watch(request)
thread = threading.Thread(
target=_consume_responses, args=(rendezvous, response_queue))
thread.start()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
response.status)
self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
response.status)
self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.NOT_SERVING)
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
response.status)
rendezvous.cancel()
thread.join()
self.assertTrue(response_queue.empty())
def test_watch_service_isolation(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
response_queue = queue.Queue()
rendezvous = self._stub.Watch(request)
thread = threading.Thread(
target=_consume_responses, args=(rendezvous, response_queue))
thread.start()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
response.status)
self._servicer.set('some-other-service',
health_pb2.HealthCheckResponse.SERVING)
with self.assertRaises(queue.Empty):
response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
rendezvous.cancel()
thread.join()
self.assertTrue(response_queue.empty())
def test_two_watchers(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
response_queue1 = queue.Queue()
response_queue2 = queue.Queue()
rendezvous1 = self._stub.Watch(request)
rendezvous2 = self._stub.Watch(request)
thread1 = threading.Thread(
target=_consume_responses, args=(rendezvous1, response_queue1))
thread2 = threading.Thread(
target=_consume_responses, args=(rendezvous2, response_queue2))
thread1.start()
thread2.start()
response1 = response_queue1.get(
timeout=test_constants.SHORT_TIMEOUT)
response2 = response_queue2.get(
timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
response1.status)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
response2.status)
self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
response1 = response_queue1.get(
timeout=test_constants.SHORT_TIMEOUT)
response2 = response_queue2.get(
timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
response1.status)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
response2.status)
rendezvous1.cancel()
rendezvous2.cancel()
thread1.join()
thread2.join()
self.assertTrue(response_queue1.empty())
self.assertTrue(response_queue2.empty())
@unittest.skip("https://github.com/grpc/grpc/issues/18127")
def test_cancelled_watch_removed_from_watch_list(self):
request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
response_queue = queue.Queue()
rendezvous = self._stub.Watch(request)
thread = threading.Thread(
target=_consume_responses, args=(rendezvous, response_queue))
thread.start()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
response.status)
rendezvous.cancel()
self._servicer.set(_WATCH_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
thread.join()
# Wait, if necessary, for serving thread to process client cancellation
timeout = time.time() + test_constants.TIME_ALLOWANCE
while time.time(
) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]:
time.sleep(1)
self.assertFalse(
self._servicer._send_response_callbacks[_WATCH_SERVICE],
'watch set should be empty')
self.assertTrue(response_queue.empty())
def test_graceful_shutdown(self):
request = health_pb2.HealthCheckRequest(service='')
response_queue = queue.Queue()
rendezvous = self._stub.Watch(request)
thread = threading.Thread(
target=_consume_responses, args=(rendezvous, response_queue))
thread.start()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
response.status)
self._servicer.enter_graceful_shutdown()
response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
response.status)
# This should be a no-op.
self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
rendezvous.cancel()
thread.join()
self.assertTrue(response_queue.empty())
class HealthServicerTest(BaseWatchTests.WatchTests):
def setUp(self):
self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
super(HealthServicerTest, self).start_server(
non_blocking=True, thread_pool=self._thread_pool)
def test_check_empty_service(self):
request = health_pb2.HealthCheckRequest()
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
def test_check_serving_service(self):
request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
def test_check_unknown_serivce(self):
request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
def test_check_not_serving_service(self):
request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
resp.status)
def test_check_not_found_service(self):
request = health_pb2.HealthCheckRequest(service='not-found')
with self.assertRaises(grpc.RpcError) as context:
resp = self._stub.Check(request)
self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
def test_health_service_name(self):
self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests):
def setUp(self):
super(HealthServicerBackwardsCompatibleWatchTest, self).start_server(
non_blocking=False, thread_pool=None)
if __name__ == '__main__':
unittest.main(verbosity=2)