| # mypy: disable_error_code="no-untyped-def,no-untyped-call,attr-defined,arg-type,no-any-return,list-item,var-annotated,import,call-overload" |
| # Copyright 2016–2021 Julien Danjou |
| # Copyright 2016 Joshua Harlow |
| # Copyright 2013 Ray Holder |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import datetime |
| import logging |
| import re |
| import sys |
| import time |
| import typing |
| import unittest |
| import warnings |
| from contextlib import contextmanager |
| from copy import copy |
| from fractions import Fraction |
| |
| import pytest |
| |
| import tenacity |
| from tenacity import RetryCallState, RetryError, Retrying, retry |
| |
| _unset = object() |
| |
| |
| def _make_unset_exception(func_name, **kwargs): |
| missing = [] |
| for k, v in kwargs.items(): |
| if v is _unset: |
| missing.append(k) |
| missing_str = ", ".join(repr(s) for s in missing) |
| return TypeError(func_name + " func missing parameters: " + missing_str) |
| |
| |
| def _set_delay_since_start(retry_state, delay): |
| # Ensure outcome_timestamp - start_time is *exactly* equal to the delay to |
| # avoid complexity in test code. |
| retry_state.start_time = Fraction(retry_state.start_time) |
| retry_state.outcome_timestamp = retry_state.start_time + Fraction(delay) |
| assert retry_state.seconds_since_start == delay |
| |
| |
| def make_retry_state( |
| previous_attempt_number, |
| delay_since_first_attempt, |
| last_result=None, |
| upcoming_sleep=0, |
| ): |
| """Construct RetryCallState for given attempt number & delay. |
| |
| Only used in testing and thus is extra careful about timestamp arithmetics. |
| """ |
| required_parameter_unset = ( |
| previous_attempt_number is _unset or delay_since_first_attempt is _unset |
| ) |
| if required_parameter_unset: |
| raise _make_unset_exception( |
| "wait/stop", |
| previous_attempt_number=previous_attempt_number, |
| delay_since_first_attempt=delay_since_first_attempt, |
| ) |
| |
| retry_state = RetryCallState(None, None, (), {}) |
| retry_state.attempt_number = previous_attempt_number |
| if last_result is not None: |
| retry_state.outcome = last_result |
| else: |
| retry_state.set_result(None) |
| |
| retry_state.upcoming_sleep = upcoming_sleep |
| |
| _set_delay_since_start(retry_state, delay_since_first_attempt) |
| return retry_state |
| |
| |
| class TestBase(unittest.TestCase): |
| def test_retrying_repr(self): |
| class ConcreteRetrying(tenacity.BaseRetrying): |
| def __call__(self, fn, *args, **kwargs): |
| pass |
| |
| repr(ConcreteRetrying()) |
| |
| def test_callstate_repr(self): |
| rs = RetryCallState(None, None, (), {}) |
| rs.idle_for = 1.1111111 |
| assert repr(rs).endswith("attempt #1; slept for 1.11; last result: none yet>") |
| rs = make_retry_state(2, 5) |
| assert repr(rs).endswith( |
| "attempt #2; slept for 0.0; last result: returned None>" |
| ) |
| rs = make_retry_state( |
| 0, 0, last_result=tenacity.Future.construct(1, ValueError("aaa"), True) |
| ) |
| assert repr(rs).endswith( |
| "attempt #0; slept for 0.0; last result: failed (ValueError aaa)>" |
| ) |
| |
| |
| class TestStopConditions(unittest.TestCase): |
| def test_never_stop(self): |
| r = Retrying() |
| self.assertFalse(r.stop(make_retry_state(3, 6546))) |
| |
| def test_stop_any(self): |
| stop = tenacity.stop_any( |
| tenacity.stop_after_delay(1), tenacity.stop_after_attempt(4) |
| ) |
| |
| def s(*args): |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertTrue(s(4, 0.8)) |
| self.assertTrue(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_all(self): |
| stop = tenacity.stop_all( |
| tenacity.stop_after_delay(1), tenacity.stop_after_attempt(4) |
| ) |
| |
| def s(*args): |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertFalse(s(4, 0.8)) |
| self.assertFalse(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_or(self): |
| stop = tenacity.stop_after_delay(1) | tenacity.stop_after_attempt(4) |
| |
| def s(*args): |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertTrue(s(4, 0.8)) |
| self.assertTrue(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_and(self): |
| stop = tenacity.stop_after_delay(1) & tenacity.stop_after_attempt(4) |
| |
| def s(*args): |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertFalse(s(4, 0.8)) |
| self.assertFalse(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_after_attempt(self): |
| r = Retrying(stop=tenacity.stop_after_attempt(3)) |
| self.assertFalse(r.stop(make_retry_state(2, 6546))) |
| self.assertTrue(r.stop(make_retry_state(3, 6546))) |
| self.assertTrue(r.stop(make_retry_state(4, 6546))) |
| |
| def test_stop_after_delay(self): |
| for delay in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(stop=tenacity.stop_after_delay(delay)) |
| self.assertFalse(r.stop(make_retry_state(2, 0.999))) |
| self.assertTrue(r.stop(make_retry_state(2, 1))) |
| self.assertTrue(r.stop(make_retry_state(2, 1.001))) |
| |
| def test_stop_before_delay(self): |
| for delay in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(stop=tenacity.stop_before_delay(delay)) |
| self.assertFalse( |
| r.stop(make_retry_state(2, 0.999, upcoming_sleep=0.0001)) |
| ) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=0.001))) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=1))) |
| |
| # It should act the same as stop_after_delay if upcoming sleep is 0 |
| self.assertFalse(r.stop(make_retry_state(2, 0.999, upcoming_sleep=0))) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=0))) |
| self.assertTrue(r.stop(make_retry_state(2, 1.001, upcoming_sleep=0))) |
| |
| def test_legacy_explicit_stop_type(self): |
| Retrying(stop="stop_after_attempt") |
| |
| def test_stop_func_with_retry_state(self): |
| def stop_func(retry_state): |
| rs = retry_state |
| return rs.attempt_number == rs.seconds_since_start |
| |
| r = Retrying(stop=stop_func) |
| self.assertFalse(r.stop(make_retry_state(1, 3))) |
| self.assertFalse(r.stop(make_retry_state(100, 99))) |
| self.assertTrue(r.stop(make_retry_state(101, 101))) |
| |
| |
| class TestWaitConditions(unittest.TestCase): |
| def test_no_sleep(self): |
| r = Retrying() |
| self.assertEqual(0, r.wait(make_retry_state(18, 9879))) |
| |
| def test_fixed_sleep(self): |
| for wait in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_fixed(wait)) |
| self.assertEqual(1, r.wait(make_retry_state(12, 6546))) |
| |
| def test_incrementing_sleep(self): |
| for start, increment in ( |
| (500, 100), |
| (datetime.timedelta(seconds=500), datetime.timedelta(seconds=100)), |
| ): |
| with self.subTest(): |
| r = Retrying( |
| wait=tenacity.wait_incrementing(start=start, increment=increment) |
| ) |
| self.assertEqual(500, r.wait(make_retry_state(1, 6546))) |
| self.assertEqual(600, r.wait(make_retry_state(2, 6546))) |
| self.assertEqual(700, r.wait(make_retry_state(3, 6546))) |
| |
| def test_random_sleep(self): |
| for min_, max_ in ( |
| (1, 20), |
| (datetime.timedelta(seconds=1), datetime.timedelta(seconds=20)), |
| ): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_random(min=min_, max=max_)) |
| times = set() |
| for _ in range(1000): |
| times.add(r.wait(make_retry_state(1, 6546))) |
| |
| # this is kind of non-deterministic... |
| self.assertTrue(len(times) > 1) |
| for t in times: |
| self.assertTrue(t >= 1) |
| self.assertTrue(t < 20) |
| |
| def test_random_sleep_withoutmin_(self): |
| r = Retrying(wait=tenacity.wait_random(max=2)) |
| times = set() |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| |
| # this is kind of non-deterministic... |
| self.assertTrue(len(times) > 1) |
| for t in times: |
| self.assertTrue(t >= 0) |
| self.assertTrue(t <= 2) |
| |
| def test_exponential(self): |
| r = Retrying(wait=tenacity.wait_exponential()) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 128) |
| |
| def test_exponential_with_max_wait(self): |
| r = Retrying(wait=tenacity.wait_exponential(max=40)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 40) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 40) |
| self.assertEqual(r.wait(make_retry_state(50, 0)), 40) |
| |
| def test_exponential_with_min_wait(self): |
| r = Retrying(wait=tenacity.wait_exponential(min=20)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 128) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 524288) |
| |
| def test_exponential_with_max_wait_and_multiplier(self): |
| r = Retrying(wait=tenacity.wait_exponential(max=50, multiplier=1)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 50) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 50) |
| self.assertEqual(r.wait(make_retry_state(50, 0)), 50) |
| |
| def test_exponential_with_min_wait_and_multiplier(self): |
| r = Retrying(wait=tenacity.wait_exponential(min=20, multiplier=2)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 128) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 256) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 1048576) |
| |
| def test_exponential_with_min_wait_andmax__wait(self): |
| for min_, max_ in ( |
| (10, 100), |
| (datetime.timedelta(seconds=10), datetime.timedelta(seconds=100)), |
| ): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_exponential(min=min_, max=max_)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 100) |
| self.assertEqual(r.wait(make_retry_state(9, 0)), 100) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 100) |
| |
| def test_legacy_explicit_wait_type(self): |
| Retrying(wait="exponential_sleep") |
| |
| def test_wait_func(self): |
| def wait_func(retry_state): |
| return retry_state.attempt_number * retry_state.seconds_since_start |
| |
| r = Retrying(wait=wait_func) |
| self.assertEqual(r.wait(make_retry_state(1, 5)), 5) |
| self.assertEqual(r.wait(make_retry_state(2, 11)), 22) |
| self.assertEqual(r.wait(make_retry_state(10, 100)), 1000) |
| |
| def test_wait_combine(self): |
| r = Retrying( |
| wait=tenacity.wait_combine( |
| tenacity.wait_random(0, 3), tenacity.wait_fixed(5) |
| ) |
| ) |
| # Test it a few time since it's random |
| for i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 8) |
| self.assertGreaterEqual(w, 5) |
| |
| def test_wait_double_sum(self): |
| r = Retrying(wait=tenacity.wait_random(0, 3) + tenacity.wait_fixed(5)) |
| # Test it a few time since it's random |
| for i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 8) |
| self.assertGreaterEqual(w, 5) |
| |
| def test_wait_triple_sum(self): |
| r = Retrying( |
| wait=tenacity.wait_fixed(1) |
| + tenacity.wait_random(0, 3) |
| + tenacity.wait_fixed(5) |
| ) |
| # Test it a few time since it's random |
| for i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 9) |
| self.assertGreaterEqual(w, 6) |
| |
| def test_wait_arbitrary_sum(self): |
| r = Retrying( |
| wait=sum( |
| [ |
| tenacity.wait_fixed(1), |
| tenacity.wait_random(0, 3), |
| tenacity.wait_fixed(5), |
| tenacity.wait_none(), |
| ] |
| ) |
| ) |
| # Test it a few time since it's random |
| for _ in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 9) |
| self.assertGreaterEqual(w, 6) |
| |
| def _assert_range(self, wait, min_, max_): |
| self.assertLess(wait, max_) |
| self.assertGreaterEqual(wait, min_) |
| |
| def _assert_inclusive_range(self, wait, low, high): |
| self.assertLessEqual(wait, high) |
| self.assertGreaterEqual(wait, low) |
| |
| def _assert_inclusive_epsilon(self, wait, target, epsilon): |
| self.assertLessEqual(wait, target + epsilon) |
| self.assertGreaterEqual(wait, target - epsilon) |
| |
| def test_wait_chain(self): |
| r = Retrying( |
| wait=tenacity.wait_chain( |
| *[tenacity.wait_fixed(1) for i in range(2)] |
| + [tenacity.wait_fixed(4) for i in range(2)] |
| + [tenacity.wait_fixed(8) for i in range(1)] |
| ) |
| ) |
| |
| for i in range(10): |
| w = r.wait(make_retry_state(i + 1, 1)) |
| if i < 2: |
| self._assert_range(w, 1, 2) |
| elif i < 4: |
| self._assert_range(w, 4, 5) |
| else: |
| self._assert_range(w, 8, 9) |
| |
| def test_wait_chain_multiple_invocations(self): |
| sleep_intervals = [] |
| r = Retrying( |
| sleep=sleep_intervals.append, |
| wait=tenacity.wait_chain(*[tenacity.wait_fixed(i + 1) for i in range(3)]), |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_if_result(lambda x: x == 1), |
| ) |
| |
| @r.wraps |
| def always_return_1(): |
| return 1 |
| |
| self.assertRaises(tenacity.RetryError, always_return_1) |
| self.assertEqual(sleep_intervals, [1.0, 2.0, 3.0, 3.0]) |
| sleep_intervals[:] = [] |
| |
| # Clear and restart retrying. |
| self.assertRaises(tenacity.RetryError, always_return_1) |
| self.assertEqual(sleep_intervals, [1.0, 2.0, 3.0, 3.0]) |
| sleep_intervals[:] = [] |
| |
| def test_wait_random_exponential(self): |
| fn = tenacity.wait_random_exponential(0.5, 60.0) |
| |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 0, 0.5) |
| self._assert_inclusive_range(fn(make_retry_state(2, 0)), 0, 1.0) |
| self._assert_inclusive_range(fn(make_retry_state(3, 0)), 0, 2.0) |
| self._assert_inclusive_range(fn(make_retry_state(4, 0)), 0, 4.0) |
| self._assert_inclusive_range(fn(make_retry_state(5, 0)), 0, 8.0) |
| self._assert_inclusive_range(fn(make_retry_state(6, 0)), 0, 16.0) |
| self._assert_inclusive_range(fn(make_retry_state(7, 0)), 0, 32.0) |
| self._assert_inclusive_range(fn(make_retry_state(8, 0)), 0, 60.0) |
| self._assert_inclusive_range(fn(make_retry_state(9, 0)), 0, 60.0) |
| |
| fn = tenacity.wait_random_exponential(10, 5) |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 0.00, 5.00) |
| |
| # Default arguments exist |
| fn = tenacity.wait_random_exponential() |
| fn(make_retry_state(0, 0)) |
| |
| def test_wait_random_exponential_statistically(self): |
| fn = tenacity.wait_random_exponential(0.5, 60.0) |
| |
| attempt = [] |
| for i in range(10): |
| attempt.append([fn(make_retry_state(i, 0)) for _ in range(4000)]) |
| |
| def mean(lst): |
| return float(sum(lst)) / float(len(lst)) |
| |
| # skipping attempt 0 |
| self._assert_inclusive_epsilon(mean(attempt[1]), 0.25, 0.02) |
| self._assert_inclusive_epsilon(mean(attempt[2]), 0.50, 0.04) |
| self._assert_inclusive_epsilon(mean(attempt[3]), 1, 0.08) |
| self._assert_inclusive_epsilon(mean(attempt[4]), 2, 0.16) |
| self._assert_inclusive_epsilon(mean(attempt[5]), 4, 0.32) |
| self._assert_inclusive_epsilon(mean(attempt[6]), 8, 0.64) |
| self._assert_inclusive_epsilon(mean(attempt[7]), 16, 1.28) |
| self._assert_inclusive_epsilon(mean(attempt[8]), 30, 2.56) |
| self._assert_inclusive_epsilon(mean(attempt[9]), 30, 2.56) |
| |
| def test_wait_exponential_jitter(self): |
| fn = tenacity.wait_exponential_jitter(max=60) |
| |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 1, 2) |
| self._assert_inclusive_range(fn(make_retry_state(2, 0)), 2, 3) |
| self._assert_inclusive_range(fn(make_retry_state(3, 0)), 4, 5) |
| self._assert_inclusive_range(fn(make_retry_state(4, 0)), 8, 9) |
| self._assert_inclusive_range(fn(make_retry_state(5, 0)), 16, 17) |
| self._assert_inclusive_range(fn(make_retry_state(6, 0)), 32, 33) |
| self.assertEqual(fn(make_retry_state(7, 0)), 60) |
| self.assertEqual(fn(make_retry_state(8, 0)), 60) |
| self.assertEqual(fn(make_retry_state(9, 0)), 60) |
| |
| fn = tenacity.wait_exponential_jitter(10, 5) |
| for _ in range(1000): |
| self.assertEqual(fn(make_retry_state(1, 0)), 5) |
| |
| # Default arguments exist |
| fn = tenacity.wait_exponential_jitter() |
| fn(make_retry_state(0, 0)) |
| |
| def test_wait_retry_state_attributes(self): |
| class ExtractCallState(Exception): |
| pass |
| |
| # retry_state is mutable, so return it as an exception to extract the |
| # exact values it has when wait is called and bypass any other logic. |
| def waitfunc(retry_state): |
| raise ExtractCallState(retry_state) |
| |
| retrying = Retrying( |
| wait=waitfunc, |
| retry=( |
| tenacity.retry_if_exception_type() |
| | tenacity.retry_if_result(lambda result: result == 123) |
| ), |
| ) |
| |
| def returnval(): |
| return 123 |
| |
| try: |
| retrying(returnval) |
| except ExtractCallState as err: |
| retry_state = err.args[0] |
| self.assertIs(retry_state.fn, returnval) |
| self.assertEqual(retry_state.args, ()) |
| self.assertEqual(retry_state.kwargs, {}) |
| self.assertEqual(retry_state.outcome.result(), 123) |
| self.assertEqual(retry_state.attempt_number, 1) |
| self.assertGreaterEqual(retry_state.outcome_timestamp, retry_state.start_time) |
| |
| def dying(): |
| raise Exception("Broken") |
| |
| try: |
| retrying(dying) |
| except ExtractCallState as err: |
| retry_state = err.args[0] |
| self.assertIs(retry_state.fn, dying) |
| self.assertEqual(retry_state.args, ()) |
| self.assertEqual(retry_state.kwargs, {}) |
| self.assertEqual(str(retry_state.outcome.exception()), "Broken") |
| self.assertEqual(retry_state.attempt_number, 1) |
| self.assertGreaterEqual(retry_state.outcome_timestamp, retry_state.start_time) |
| |
| |
| class TestRetryConditions(unittest.TestCase): |
| def test_retry_if_result(self): |
| retry = tenacity.retry_if_result(lambda x: x == 1) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| |
| def test_retry_if_not_result(self): |
| retry = tenacity.retry_if_not_result(lambda x: x == 1) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, False))) |
| |
| def test_retry_any(self): |
| retry = tenacity.retry_any( |
| tenacity.retry_if_result(lambda x: x == 1), |
| tenacity.retry_if_result(lambda x: x == 2), |
| ) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertTrue(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_all(self): |
| retry = tenacity.retry_all( |
| tenacity.retry_if_result(lambda x: x == 1), |
| tenacity.retry_if_result(lambda x: isinstance(x, int)), |
| ) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_and(self): |
| retry = tenacity.retry_if_result(lambda x: x == 1) & tenacity.retry_if_result( |
| lambda x: isinstance(x, int) |
| ) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_or(self): |
| retry = tenacity.retry_if_result( |
| lambda x: x == "foo" |
| ) | tenacity.retry_if_result(lambda x: isinstance(x, int)) |
| |
| def r(fut): |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, "foo", False))) |
| self.assertFalse(r(tenacity.Future.construct(1, "foobar", False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2.2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 42, True))) |
| |
| def _raise_try_again(self): |
| self._attempts += 1 |
| if self._attempts < 3: |
| raise tenacity.TryAgain |
| |
| def test_retry_try_again(self): |
| self._attempts = 0 |
| Retrying(stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_never)( |
| self._raise_try_again |
| ) |
| self.assertEqual(3, self._attempts) |
| |
| def test_retry_try_again_forever(self): |
| def _r(): |
| raise tenacity.TryAgain |
| |
| r = Retrying(stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_never) |
| self.assertRaises(tenacity.RetryError, r, _r) |
| self.assertEqual(5, r.statistics["attempt_number"]) |
| |
| def test_retry_try_again_forever_reraise(self): |
| def _r(): |
| raise tenacity.TryAgain |
| |
| r = Retrying( |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_never, |
| reraise=True, |
| ) |
| self.assertRaises(tenacity.TryAgain, r, _r) |
| self.assertEqual(5, r.statistics["attempt_number"]) |
| |
| def test_retry_if_exception_message_negative_no_inputs(self): |
| with self.assertRaises(TypeError): |
| tenacity.retry_if_exception_message() |
| |
| def test_retry_if_exception_message_negative_too_many_inputs(self): |
| with self.assertRaises(TypeError): |
| tenacity.retry_if_exception_message(message="negative", match="negative") |
| |
| |
| class NoneReturnUntilAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Return None until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return None |
| return True |
| |
| |
| class NoIOErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Raise an IOError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise OSError("Hi there, I'm an IOError") |
| return True |
| |
| |
| class NoNameErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Raise a NameError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise NameError("Hi there, I'm a NameError") |
| return True |
| |
| |
| class NoNameErrorCauseAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go2(self): |
| raise NameError("Hi there, I'm a NameError") |
| |
| def go(self): |
| """Raise an IOError with a NameError as cause until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| try: |
| self.go2() |
| except NameError as e: |
| raise OSError() from e |
| |
| return True |
| |
| |
| class NoIOErrorCauseAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go2(self): |
| raise OSError("Hi there, I'm an IOError") |
| |
| def go(self): |
| """Raise a NameError with an IOError as cause until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| try: |
| self.go2() |
| except OSError as e: |
| raise NameError() from e |
| |
| return True |
| |
| |
| class NameErrorUntilCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| derived_message = "Hi there, I'm a NameError" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Return True until after count threshold has been crossed. |
| |
| Then raise a NameError. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return True |
| raise NameError(self.derived_message) |
| |
| |
| class IOErrorUntilCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Return True until after count threshold has been crossed. |
| |
| Then raise an IOError. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return True |
| raise OSError("Hi there, I'm an IOError") |
| |
| |
| class CustomError(Exception): |
| """This is a custom exception class. |
| |
| Note that For Python 2.x, we don't strictly need to extend BaseException, |
| however, Python 3.x will complain. While this test suite won't run |
| correctly under Python 3.x without extending from the Python exception |
| hierarchy, the actual module code is backwards compatible Python 2.x and |
| will allow for cases where exception classes don't extend from the |
| hierarchy. |
| """ |
| |
| def __init__(self, value): |
| self.value = value |
| |
| def __str__(self): |
| return self.value |
| |
| |
| class NoCustomErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| derived_message = "This is a Custom exception class" |
| |
| def __init__(self, count): |
| self.counter = 0 |
| self.count = count |
| |
| def go(self): |
| """Raise a CustomError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise CustomError(self.derived_message) |
| return True |
| |
| |
| class CapturingHandler(logging.Handler): |
| """Captures log records for inspection.""" |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self.records = [] |
| |
| def emit(self, record): |
| self.records.append(record) |
| |
| |
| def current_time_ms(): |
| return int(round(time.time() * 1000)) |
| |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.05), |
| retry=tenacity.retry_if_result(lambda result: result is None), |
| ) |
| def _retryable_test_with_wait(thing): |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_result(lambda result: result is None), |
| ) |
| def _retryable_test_with_stop(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_cause_type(NameError)) |
| def _retryable_test_with_exception_cause_type(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_type(IOError)) |
| def _retryable_test_with_exception_type_io(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_not_exception_type(IOError)) |
| def _retryable_test_if_not_exception_type_io(thing): |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), retry=tenacity.retry_if_exception_type(IOError) |
| ) |
| def _retryable_test_with_exception_type_io_attempt_limit(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_unless_exception_type(NameError)) |
| def _retryable_test_with_unless_exception_type_name(thing): |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_unless_exception_type(NameError), |
| ) |
| def _retryable_test_with_unless_exception_type_name_attempt_limit(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_unless_exception_type()) |
| def _retryable_test_with_unless_exception_type_no_input(thing): |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_if_exception_message( |
| message=NoCustomErrorAfterCount.derived_message |
| ), |
| ) |
| def _retryable_test_if_exception_message_message(thing): |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| message=NoCustomErrorAfterCount.derived_message |
| ) |
| ) |
| def _retryable_test_if_not_exception_message_message(thing): |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_exception_message( |
| match=NoCustomErrorAfterCount.derived_message[:3] + ".*" |
| ) |
| ) |
| def _retryable_test_if_exception_message_match(thing): |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| match=NoCustomErrorAfterCount.derived_message[:3] + ".*" |
| ) |
| ) |
| def _retryable_test_if_not_exception_message_match(thing): |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| message=NameErrorUntilCount.derived_message |
| ) |
| ) |
| def _retryable_test_not_exception_message_delay(thing): |
| return thing.go() |
| |
| |
| @retry |
| def _retryable_default(thing): |
| return thing.go() |
| |
| |
| @retry() |
| def _retryable_default_f(thing): |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_type(CustomError)) |
| def _retryable_test_with_exception_type_custom(thing): |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_exception_type(CustomError), |
| ) |
| def _retryable_test_with_exception_type_custom_attempt_limit(thing): |
| return thing.go() |
| |
| |
| class TestDecoratorWrapper(unittest.TestCase): |
| def test_with_wait(self): |
| start = current_time_ms() |
| result = _retryable_test_with_wait(NoneReturnUntilAfterCount(5)) |
| t = current_time_ms() - start |
| self.assertGreaterEqual(t, 250) |
| self.assertTrue(result) |
| |
| def test_with_stop_on_return_value(self): |
| try: |
| _retryable_test_with_stop(NoneReturnUntilAfterCount(5)) |
| self.fail("Expected RetryError after 3 attempts") |
| except RetryError as re: |
| self.assertFalse(re.last_attempt.failed) |
| self.assertEqual(3, re.last_attempt.attempt_number) |
| self.assertTrue(re.last_attempt.result() is None) |
| print(re) |
| |
| def test_with_stop_on_exception(self): |
| try: |
| _retryable_test_with_stop(NoIOErrorAfterCount(5)) |
| self.fail("Expected IOError") |
| except OSError as re: |
| self.assertTrue(isinstance(re, IOError)) |
| print(re) |
| |
| def test_retry_if_exception_of_type(self): |
| self.assertTrue(_retryable_test_with_exception_type_io(NoIOErrorAfterCount(5))) |
| |
| try: |
| _retryable_test_with_exception_type_io(NoNameErrorAfterCount(5)) |
| self.fail("Expected NameError") |
| except NameError as n: |
| self.assertTrue(isinstance(n, NameError)) |
| print(n) |
| |
| self.assertTrue( |
| _retryable_test_with_exception_type_custom(NoCustomErrorAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_with_exception_type_custom(NoNameErrorAfterCount(5)) |
| self.fail("Expected NameError") |
| except NameError as n: |
| self.assertTrue(isinstance(n, NameError)) |
| print(n) |
| |
| def test_retry_except_exception_of_type(self): |
| self.assertTrue( |
| _retryable_test_if_not_exception_type_io(NoNameErrorAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_if_not_exception_type_io(NoIOErrorAfterCount(5)) |
| self.fail("Expected IOError") |
| except OSError as err: |
| self.assertTrue(isinstance(err, IOError)) |
| print(err) |
| |
| def test_retry_until_exception_of_type_attempt_number(self): |
| try: |
| self.assertTrue( |
| _retryable_test_with_unless_exception_type_name(NameErrorUntilCount(5)) |
| ) |
| except NameError as e: |
| s = _retryable_test_with_unless_exception_type_name.retry.statistics |
| self.assertTrue(s["attempt_number"] == 6) |
| print(e) |
| else: |
| self.fail("Expected NameError") |
| |
| def test_retry_until_exception_of_type_no_type(self): |
| try: |
| # no input should catch all subclasses of Exception |
| self.assertTrue( |
| _retryable_test_with_unless_exception_type_no_input( |
| NameErrorUntilCount(5) |
| ) |
| ) |
| except NameError as e: |
| s = _retryable_test_with_unless_exception_type_no_input.retry.statistics |
| self.assertTrue(s["attempt_number"] == 6) |
| print(e) |
| else: |
| self.fail("Expected NameError") |
| |
| def test_retry_until_exception_of_type_wrong_exception(self): |
| try: |
| # two iterations with IOError, one that returns True |
| _retryable_test_with_unless_exception_type_name_attempt_limit( |
| IOErrorUntilCount(2) |
| ) |
| self.fail("Expected RetryError") |
| except RetryError as e: |
| self.assertTrue(isinstance(e, RetryError)) |
| print(e) |
| |
| def test_retry_if_exception_message(self): |
| try: |
| self.assertTrue( |
| _retryable_test_if_exception_message_message(NoCustomErrorAfterCount(3)) |
| ) |
| except CustomError: |
| print(_retryable_test_if_exception_message_message.retry.statistics) |
| self.fail("CustomError should've been retried from errormessage") |
| |
| def test_retry_if_not_exception_message(self): |
| try: |
| self.assertTrue( |
| _retryable_test_if_not_exception_message_message( |
| NoCustomErrorAfterCount(2) |
| ) |
| ) |
| except CustomError: |
| s = _retryable_test_if_not_exception_message_message.retry.statistics |
| self.assertTrue(s["attempt_number"] == 1) |
| |
| def test_retry_if_not_exception_message_delay(self): |
| try: |
| self.assertTrue( |
| _retryable_test_not_exception_message_delay(NameErrorUntilCount(3)) |
| ) |
| except NameError: |
| s = _retryable_test_not_exception_message_delay.retry.statistics |
| print(s["attempt_number"]) |
| self.assertTrue(s["attempt_number"] == 4) |
| |
| def test_retry_if_exception_message_match(self): |
| try: |
| self.assertTrue( |
| _retryable_test_if_exception_message_match(NoCustomErrorAfterCount(3)) |
| ) |
| except CustomError: |
| self.fail("CustomError should've been retried from errormessage") |
| |
| def test_retry_if_not_exception_message_match(self): |
| try: |
| self.assertTrue( |
| _retryable_test_if_not_exception_message_message( |
| NoCustomErrorAfterCount(2) |
| ) |
| ) |
| except CustomError: |
| s = _retryable_test_if_not_exception_message_message.retry.statistics |
| self.assertTrue(s["attempt_number"] == 1) |
| |
| def test_retry_if_exception_cause_type(self): |
| self.assertTrue( |
| _retryable_test_with_exception_cause_type(NoNameErrorCauseAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_with_exception_cause_type(NoIOErrorCauseAfterCount(5)) |
| self.fail("Expected exception without NameError as cause") |
| except NameError: |
| pass |
| |
| def test_retry_preserves_argument_defaults(self): |
| def function_with_defaults(a=1): |
| return a |
| |
| def function_with_kwdefaults(*, a=1): |
| return a |
| |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), stop=tenacity.stop_after_attempt(3) |
| ) |
| wrapped_defaults_function = retrying.wraps(function_with_defaults) |
| wrapped_kwdefaults_function = retrying.wraps(function_with_kwdefaults) |
| |
| self.assertEqual( |
| function_with_defaults.__defaults__, wrapped_defaults_function.__defaults__ |
| ) |
| self.assertEqual( |
| function_with_kwdefaults.__kwdefaults__, |
| wrapped_kwdefaults_function.__kwdefaults__, |
| ) |
| |
| def test_defaults(self): |
| self.assertTrue(_retryable_default(NoNameErrorAfterCount(5))) |
| self.assertTrue(_retryable_default_f(NoNameErrorAfterCount(5))) |
| self.assertTrue(_retryable_default(NoCustomErrorAfterCount(5))) |
| self.assertTrue(_retryable_default_f(NoCustomErrorAfterCount(5))) |
| |
| def test_retry_function_object(self): |
| """Test that funсtools.wraps doesn't cause problems with callable objects. |
| |
| It raises an error upon trying to wrap it in Py2, because __name__ |
| attribute is missing. It's fixed in Py3 but was never backported. |
| """ |
| |
| class Hello: |
| def __call__(self): |
| return "Hello" |
| |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), stop=tenacity.stop_after_attempt(3) |
| ) |
| h = retrying.wraps(Hello()) |
| self.assertEqual(h(), "Hello") |
| |
| |
| class TestRetryWith: |
| def test_redefine_wait(self): |
| start = current_time_ms() |
| result = _retryable_test_with_wait.retry_with(wait=tenacity.wait_fixed(0.1))( |
| NoneReturnUntilAfterCount(5) |
| ) |
| t = current_time_ms() - start |
| assert t >= 500 |
| assert result is True |
| |
| def test_redefine_stop(self): |
| result = _retryable_test_with_stop.retry_with( |
| stop=tenacity.stop_after_attempt(5) |
| )(NoneReturnUntilAfterCount(4)) |
| assert result is True |
| |
| def test_retry_error_cls_should_be_preserved(self): |
| @retry(stop=tenacity.stop_after_attempt(10), retry_error_cls=ValueError) |
| def _retryable(): |
| raise Exception("raised for test purposes") |
| |
| with pytest.raises(Exception) as exc_ctx: |
| _retryable.retry_with(stop=tenacity.stop_after_attempt(2))() |
| |
| assert exc_ctx.type is ValueError, "Should remap to specific exception type" |
| |
| def test_retry_error_callback_should_be_preserved(self): |
| def return_text(retry_state): |
| return "Calling {} keeps raising errors after {} attempts".format( |
| retry_state.fn.__name__, |
| retry_state.attempt_number, |
| ) |
| |
| @retry(stop=tenacity.stop_after_attempt(10), retry_error_callback=return_text) |
| def _retryable(): |
| raise Exception("raised for test purposes") |
| |
| result = _retryable.retry_with(stop=tenacity.stop_after_attempt(5))() |
| assert result == "Calling _retryable keeps raising errors after 5 attempts" |
| |
| |
| class TestBeforeAfterAttempts(unittest.TestCase): |
| _attempt_number = 0 |
| |
| def test_before_attempts(self): |
| TestBeforeAfterAttempts._attempt_number = 0 |
| |
| def _before(retry_state): |
| TestBeforeAfterAttempts._attempt_number = retry_state.attempt_number |
| |
| @retry( |
| wait=tenacity.wait_fixed(1), |
| stop=tenacity.stop_after_attempt(1), |
| before=_before, |
| ) |
| def _test_before(): |
| pass |
| |
| _test_before() |
| |
| self.assertTrue(TestBeforeAfterAttempts._attempt_number == 1) |
| |
| def test_after_attempts(self): |
| TestBeforeAfterAttempts._attempt_number = 0 |
| |
| def _after(retry_state): |
| TestBeforeAfterAttempts._attempt_number = retry_state.attempt_number |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(3), |
| after=_after, |
| ) |
| def _test_after(): |
| if TestBeforeAfterAttempts._attempt_number < 2: |
| raise Exception("testing after_attempts handler") |
| else: |
| pass |
| |
| _test_after() |
| |
| self.assertTrue(TestBeforeAfterAttempts._attempt_number == 2) |
| |
| def test_before_sleep(self): |
| def _before_sleep(retry_state): |
| self.assertGreater(retry_state.next_action.sleep, 0) |
| _before_sleep.attempt_number = retry_state.attempt_number |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| def _test_before_sleep(): |
| if _before_sleep.attempt_number < 2: |
| raise Exception("testing before_sleep_attempts handler") |
| |
| _test_before_sleep() |
| self.assertEqual(_before_sleep.attempt_number, 2) |
| |
| def _before_sleep_log_raises(self, get_call_fn): |
| thing = NoIOErrorAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log(logger, logging.INFO) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| get_call_fn(retrying)(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = ( |
| r"^Retrying .* in 0\.01 seconds as it raised " |
| r"(IO|OS)Error: Hi there, I'm an IOError\.$" |
| ) |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_raises(self): |
| self._before_sleep_log_raises(lambda x: x) |
| |
| def test_before_sleep_log_raises_with_exc_info(self): |
| thing = NoIOErrorAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log( |
| logger, logging.INFO, exc_info=True |
| ) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| retrying(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = re.compile( |
| r"^Retrying .* in 0\.01 seconds as it raised " |
| r"(IO|OS)Error: Hi there, I'm an IOError\.{0}" |
| r"Traceback \(most recent call last\):{0}" |
| r".*$".format("\n"), |
| flags=re.MULTILINE, |
| ) |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_returns(self, exc_info=False): |
| thing = NoneReturnUntilAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log( |
| logger, logging.INFO, exc_info=exc_info |
| ) |
| _retry = tenacity.retry_if_result(lambda result: result is None) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| retry=_retry, |
| before_sleep=_before_sleep, |
| ) |
| retrying(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = r"^Retrying .* in 0\.01 seconds as it returned None\.$" |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_returns_with_exc_info(self): |
| self.test_before_sleep_log_returns(exc_info=True) |
| |
| |
| class TestReraiseExceptions(unittest.TestCase): |
| def test_reraise_by_default(self): |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| reraise=True, |
| ) |
| def _reraised_by_default(): |
| calls.append("x") |
| raise KeyError("Bad key") |
| |
| self.assertRaises(KeyError, _reraised_by_default) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_from_retry_error(self): |
| calls = [] |
| |
| @retry(wait=tenacity.wait_fixed(0.1), stop=tenacity.stop_after_attempt(2)) |
| def _raise_key_error(): |
| calls.append("x") |
| raise KeyError("Bad key") |
| |
| def _reraised_key_error(): |
| try: |
| _raise_key_error() |
| except tenacity.RetryError as retry_err: |
| retry_err.reraise() |
| |
| self.assertRaises(KeyError, _reraised_key_error) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_timeout_from_retry_error(self): |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| retry=lambda retry_state: True, |
| ) |
| def _mock_fn(): |
| calls.append("x") |
| |
| def _reraised_mock_fn(): |
| try: |
| _mock_fn() |
| except tenacity.RetryError as retry_err: |
| retry_err.reraise() |
| |
| self.assertRaises(tenacity.RetryError, _reraised_mock_fn) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_no_exception(self): |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| retry=lambda retry_state: True, |
| reraise=True, |
| ) |
| def _mock_fn(): |
| calls.append("x") |
| |
| self.assertRaises(tenacity.RetryError, _mock_fn) |
| self.assertEqual(2, len(calls)) |
| |
| |
| class TestStatistics(unittest.TestCase): |
| def test_stats(self): |
| @retry() |
| def _foobar(): |
| return 42 |
| |
| self.assertEqual({}, _foobar.retry.statistics) |
| _foobar() |
| self.assertEqual(1, _foobar.retry.statistics["attempt_number"]) |
| |
| def test_stats_failing(self): |
| @retry(stop=tenacity.stop_after_attempt(2)) |
| def _foobar(): |
| raise ValueError(42) |
| |
| self.assertEqual({}, _foobar.retry.statistics) |
| try: |
| _foobar() |
| except Exception: # noqa: B902 |
| pass |
| self.assertEqual(2, _foobar.retry.statistics["attempt_number"]) |
| |
| |
| class TestRetryErrorCallback(unittest.TestCase): |
| def setUp(self): |
| self._attempt_number = 0 |
| self._callback_called = False |
| |
| def _callback(self, fut): |
| self._callback_called = True |
| return fut |
| |
| def test_retry_error_callback(self): |
| num_attempts = 3 |
| |
| def retry_error_callback(retry_state): |
| retry_error_callback.called_times += 1 |
| return retry_state.outcome |
| |
| retry_error_callback.called_times = 0 |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(num_attempts), |
| retry_error_callback=retry_error_callback, |
| ) |
| def _foobar(): |
| self._attempt_number += 1 |
| raise Exception("This exception should not be raised") |
| |
| result = _foobar() |
| |
| self.assertEqual(retry_error_callback.called_times, 1) |
| self.assertEqual(num_attempts, self._attempt_number) |
| self.assertIsInstance(result, tenacity.Future) |
| |
| |
| class TestContextManager(unittest.TestCase): |
| def test_context_manager_retry_one(self): |
| from tenacity import Retrying |
| |
| raise_ = True |
| |
| for attempt in Retrying(): |
| with attempt: |
| if raise_: |
| raise_ = False |
| raise Exception("Retry it!") |
| |
| def test_context_manager_on_error(self): |
| from tenacity import Retrying |
| |
| class CustomError(Exception): |
| pass |
| |
| retry = Retrying(retry=tenacity.retry_if_exception_type(IOError)) |
| |
| def test(): |
| for attempt in retry: |
| with attempt: |
| raise CustomError("Don't retry!") |
| |
| self.assertRaises(CustomError, test) |
| |
| def test_context_manager_retry_error(self): |
| from tenacity import Retrying |
| |
| retry = Retrying(stop=tenacity.stop_after_attempt(2)) |
| |
| def test(): |
| for attempt in retry: |
| with attempt: |
| raise Exception("Retry it!") |
| |
| self.assertRaises(RetryError, test) |
| |
| def test_context_manager_reraise(self): |
| from tenacity import Retrying |
| |
| class CustomError(Exception): |
| pass |
| |
| retry = Retrying(reraise=True, stop=tenacity.stop_after_attempt(2)) |
| |
| def test(): |
| for attempt in retry: |
| with attempt: |
| raise CustomError("Don't retry!") |
| |
| self.assertRaises(CustomError, test) |
| |
| |
| class TestInvokeAsCallable: |
| """Test direct invocation of Retrying as a callable.""" |
| |
| @staticmethod |
| def invoke(retry, f): |
| """ |
| Invoke Retrying logic. |
| |
| Wrapper allows testing different call mechanisms in test sub-classes. |
| """ |
| return retry(f) |
| |
| def test_retry_one(self): |
| def f(): |
| f.calls.append(len(f.calls) + 1) |
| if len(f.calls) <= 1: |
| raise Exception("Retry it!") |
| return 42 |
| |
| f.calls = [] |
| |
| retry = Retrying() |
| assert self.invoke(retry, f) == 42 |
| assert f.calls == [1, 2] |
| |
| def test_on_error(self): |
| class CustomError(Exception): |
| pass |
| |
| def f(): |
| f.calls.append(len(f.calls) + 1) |
| if len(f.calls) <= 1: |
| raise CustomError("Don't retry!") |
| return 42 |
| |
| f.calls = [] |
| |
| retry = Retrying(retry=tenacity.retry_if_exception_type(IOError)) |
| with pytest.raises(CustomError): |
| self.invoke(retry, f) |
| assert f.calls == [1] |
| |
| def test_retry_error(self): |
| def f(): |
| f.calls.append(len(f.calls) + 1) |
| raise Exception("Retry it!") |
| |
| f.calls = [] |
| |
| retry = Retrying(stop=tenacity.stop_after_attempt(2)) |
| with pytest.raises(RetryError): |
| self.invoke(retry, f) |
| assert f.calls == [1, 2] |
| |
| def test_reraise(self): |
| class CustomError(Exception): |
| pass |
| |
| def f(): |
| f.calls.append(len(f.calls) + 1) |
| raise CustomError("Retry it!") |
| |
| f.calls = [] |
| |
| retry = Retrying(reraise=True, stop=tenacity.stop_after_attempt(2)) |
| with pytest.raises(CustomError): |
| self.invoke(retry, f) |
| assert f.calls == [1, 2] |
| |
| |
| class TestRetryException(unittest.TestCase): |
| def test_retry_error_is_pickleable(self): |
| import pickle |
| |
| expected = RetryError(last_attempt=123) |
| pickled = pickle.dumps(expected) |
| actual = pickle.loads(pickled) |
| self.assertEqual(expected.last_attempt, actual.last_attempt) |
| |
| |
| class TestRetryTyping(unittest.TestCase): |
| @pytest.mark.skipif( |
| sys.version_info < (3, 0), reason="typeguard not supported for python 2" |
| ) |
| def test_retry_type_annotations(self): |
| """The decorator should maintain types of decorated functions.""" |
| # Just in case this is run with unit-test, return early for py2 |
| if sys.version_info < (3, 0): |
| return |
| |
| # Function-level import because we can't install this for python 2. |
| from typeguard import check_type |
| |
| def num_to_str(number): |
| # type: (int) -> str |
| return str(number) |
| |
| # equivalent to a raw @retry decoration |
| with_raw = retry(num_to_str) |
| with_raw_result = with_raw(1) |
| |
| # equivalent to a @retry(...) decoration |
| with_constructor = retry()(num_to_str) |
| with_constructor_result = with_raw(1) |
| |
| # These raise TypeError exceptions if they fail |
| check_type(with_raw, typing.Callable[[int], str]) |
| check_type(with_raw_result, str) |
| check_type(with_constructor, typing.Callable[[int], str]) |
| check_type(with_constructor_result, str) |
| |
| |
| @contextmanager |
| def reports_deprecation_warning(): |
| __tracebackhide__ = True |
| oldfilters = copy(warnings.filters) |
| warnings.simplefilter("always") |
| try: |
| with pytest.warns(DeprecationWarning): |
| yield |
| finally: |
| warnings.filters = oldfilters |
| |
| |
| class TestMockingSleep: |
| RETRY_ARGS = dict( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(5), |
| ) |
| |
| def _fail(self): |
| raise NotImplementedError() |
| |
| @retry(**RETRY_ARGS) |
| def _decorated_fail(self): |
| self._fail() |
| |
| @pytest.fixture() |
| def mock_sleep(self, monkeypatch): |
| class MockSleep: |
| call_count = 0 |
| |
| def __call__(self, seconds): |
| self.call_count += 1 |
| |
| sleep = MockSleep() |
| monkeypatch.setattr(tenacity.nap.time, "sleep", sleep) |
| yield sleep |
| |
| def test_decorated(self, mock_sleep): |
| with pytest.raises(RetryError): |
| self._decorated_fail() |
| assert mock_sleep.call_count == 4 |
| |
| def test_decorated_retry_with(self, mock_sleep): |
| fail_faster = self._decorated_fail.retry_with( |
| stop=tenacity.stop_after_attempt(2), |
| ) |
| with pytest.raises(RetryError): |
| fail_faster() |
| assert mock_sleep.call_count == 1 |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |