| # Copyright 2017 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import random |
| import threading |
| import time |
| import unittest |
| |
| import grpc_testing |
| |
| _QUANTUM = 0.3 |
| _MANY = 10000 |
| # Tests that run in real time can either wait for the scheduler to |
| # eventually run what needs to be run (and risk timing out) or declare |
| # that the scheduler didn't schedule work reasonably fast enough. We |
| # choose the latter for this test. |
| _PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!' |
| |
| |
| class _TimeNoter(object): |
| |
| def __init__(self, time): |
| self._condition = threading.Condition() |
| self._time = time |
| self._call_times = [] |
| |
| def __call__(self): |
| with self._condition: |
| self._call_times.append(self._time.time()) |
| |
| def call_times(self): |
| with self._condition: |
| return tuple(self._call_times) |
| |
| |
| class TimeTest(object): |
| |
| def test_sleep_for(self): |
| start_time = self._time.time() |
| self._time.sleep_for(_QUANTUM) |
| end_time = self._time.time() |
| |
| self.assertLessEqual(start_time + _QUANTUM, end_time) |
| |
| def test_sleep_until(self): |
| start_time = self._time.time() |
| self._time.sleep_until(start_time + _QUANTUM) |
| end_time = self._time.time() |
| |
| self.assertLessEqual(start_time + _QUANTUM, end_time) |
| |
| def test_call_in(self): |
| time_noter = _TimeNoter(self._time) |
| |
| start_time = self._time.time() |
| self._time.call_in(time_noter, _QUANTUM) |
| self._time.sleep_for(_QUANTUM * 2) |
| call_times = time_noter.call_times() |
| |
| self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) |
| self.assertLessEqual(start_time + _QUANTUM, call_times[0]) |
| |
| def test_call_at(self): |
| time_noter = _TimeNoter(self._time) |
| |
| start_time = self._time.time() |
| self._time.call_at(time_noter, self._time.time() + _QUANTUM) |
| self._time.sleep_for(_QUANTUM * 2) |
| call_times = time_noter.call_times() |
| |
| self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) |
| self.assertLessEqual(start_time + _QUANTUM, call_times[0]) |
| |
| def test_cancel(self): |
| time_noter = _TimeNoter(self._time) |
| |
| future = self._time.call_in(time_noter, _QUANTUM * 2) |
| self._time.sleep_for(_QUANTUM) |
| cancelled = future.cancel() |
| self._time.sleep_for(_QUANTUM * 2) |
| call_times = time_noter.call_times() |
| |
| self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING) |
| self.assertTrue(cancelled) |
| self.assertTrue(future.cancelled()) |
| |
| def test_many(self): |
| test_events = tuple(threading.Event() for _ in range(_MANY)) |
| possibly_cancelled_futures = {} |
| background_noise_futures = [] |
| |
| for test_event in test_events: |
| possibly_cancelled_futures[test_event] = self._time.call_in( |
| test_event.set, _QUANTUM * (2 + random.random())) |
| for _ in range(_MANY): |
| background_noise_futures.append( |
| self._time.call_in(threading.Event().set, |
| _QUANTUM * 1000 * random.random())) |
| self._time.sleep_for(_QUANTUM) |
| cancelled = set() |
| for test_event, test_future in possibly_cancelled_futures.items(): |
| if bool(random.randint(0, 1)) and test_future.cancel(): |
| cancelled.add(test_event) |
| self._time.sleep_for(_QUANTUM * 3) |
| |
| for test_event in test_events: |
| (self.assertFalse if test_event in cancelled else self.assertTrue)( |
| test_event.is_set()) |
| for background_noise_future in background_noise_futures: |
| background_noise_future.cancel() |
| |
| def test_same_behavior_used_several_times(self): |
| time_noter = _TimeNoter(self._time) |
| |
| start_time = self._time.time() |
| first_future_at_one = self._time.call_in(time_noter, _QUANTUM) |
| second_future_at_one = self._time.call_in(time_noter, _QUANTUM) |
| first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) |
| second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) |
| self._time.sleep_for(_QUANTUM * 2) |
| first_future_at_one_cancelled = first_future_at_one.cancel() |
| second_future_at_one_cancelled = second_future_at_one.cancel() |
| first_future_at_three_cancelled = first_future_at_three.cancel() |
| self._time.sleep_for(_QUANTUM * 2) |
| second_future_at_three_cancelled = second_future_at_three.cancel() |
| first_future_at_three_cancelled_again = first_future_at_three.cancel() |
| call_times = time_noter.call_times() |
| |
| self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING) |
| self.assertFalse(first_future_at_one_cancelled) |
| self.assertFalse(second_future_at_one_cancelled) |
| self.assertTrue(first_future_at_three_cancelled) |
| self.assertFalse(second_future_at_three_cancelled) |
| self.assertTrue(first_future_at_three_cancelled_again) |
| self.assertLessEqual(start_time + _QUANTUM, call_times[0]) |
| self.assertLessEqual(start_time + _QUANTUM, call_times[1]) |
| self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2]) |
| |
| |
| class StrictRealTimeTest(TimeTest, unittest.TestCase): |
| |
| def setUp(self): |
| self._time = grpc_testing.strict_real_time() |
| |
| |
| class StrictFakeTimeTest(TimeTest, unittest.TestCase): |
| |
| def setUp(self): |
| self._time = grpc_testing.strict_fake_time( |
| random.randint(0, int(time.time()))) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |