| # Copyright 2016–2021 Julien Danjou |
| # Copyright 2016 Joshua Harlow |
| # Copyright 2013 Ray Holder |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import contextlib |
| import datetime |
| import logging |
| import pickle |
| import re |
| import time |
| import typing |
| import unittest |
| from fractions import Fraction |
| from unittest import mock |
| |
| import pytest |
| from typeguard import check_type |
| |
| import tenacity |
| from tenacity import RetryCallState, RetryError, Retrying, retry |
| from tenacity.retry import retry_all, retry_any |
| |
| _unset = object() |
| |
| |
| def _make_unset_exception(func_name: str, **kwargs: typing.Any) -> TypeError: |
| missing = [] |
| for k, v in kwargs.items(): |
| if v is _unset: |
| missing.append(k) |
| missing_str = ", ".join(repr(s) for s in missing) |
| return TypeError(func_name + " func missing parameters: " + missing_str) |
| |
| |
| def _set_delay_since_start(retry_state: RetryCallState, delay: typing.Any) -> None: |
| # Ensure outcome_timestamp - start_time is *exactly* equal to the delay to |
| # avoid complexity in test code. |
| retry_state.start_time = Fraction(retry_state.start_time) # type: ignore[assignment] |
| retry_state.outcome_timestamp = retry_state.start_time + Fraction(delay) |
| assert retry_state.seconds_since_start == delay |
| |
| |
| def make_retry_state( |
| previous_attempt_number: typing.Any, |
| delay_since_first_attempt: typing.Any, |
| last_result: typing.Any = None, |
| upcoming_sleep: typing.Any = 0, |
| ) -> RetryCallState: |
| """Construct RetryCallState for given attempt number & delay. |
| |
| Only used in testing and thus is extra careful about timestamp arithmetics. |
| """ |
| required_parameter_unset = ( |
| previous_attempt_number is _unset or delay_since_first_attempt is _unset |
| ) |
| if required_parameter_unset: |
| raise _make_unset_exception( |
| "wait/stop", |
| previous_attempt_number=previous_attempt_number, |
| delay_since_first_attempt=delay_since_first_attempt, |
| ) |
| |
| retry_state = RetryCallState(None, None, (), {}) # type: ignore[arg-type] |
| retry_state.attempt_number = previous_attempt_number |
| if last_result is not None: |
| retry_state.outcome = last_result |
| else: |
| retry_state.set_result(None) |
| |
| retry_state.upcoming_sleep = upcoming_sleep |
| |
| _set_delay_since_start(retry_state, delay_since_first_attempt) |
| return retry_state |
| |
| |
| class TestBase(unittest.TestCase): |
| def test_retrying_repr(self) -> None: |
| class ConcreteRetrying(tenacity.BaseRetrying): |
| def __call__( |
| self, fn: typing.Any, *args: typing.Any, **kwargs: typing.Any |
| ) -> typing.Any: |
| pass |
| |
| repr(ConcreteRetrying()) |
| |
| def test_callstate_repr(self) -> None: |
| rs = RetryCallState(None, None, (), {}) # type: ignore[arg-type] |
| rs.idle_for = 1.1111111 |
| assert repr(rs).endswith("attempt #1; slept for 1.11; last result: none yet>") |
| rs = make_retry_state(2, 5) |
| assert repr(rs).endswith( |
| "attempt #2; slept for 0.0; last result: returned None>" |
| ) |
| rs = make_retry_state( |
| 0, 0, last_result=tenacity.Future.construct(1, ValueError("aaa"), True) |
| ) |
| assert repr(rs).endswith( |
| "attempt #0; slept for 0.0; last result: failed (ValueError aaa)>" |
| ) |
| |
| |
| class TestRetryingName(unittest.TestCase): |
| def test_str_default(self) -> None: |
| """Without a name, str() returns '<unknown>'.""" |
| assert str(Retrying()) == "<unknown>" |
| |
| def test_str_with_name(self) -> None: |
| """With a name, str() returns the given name.""" |
| assert str(Retrying(name="my_block")) == "my_block" |
| |
| def test_str_preserved_by_copy(self) -> None: |
| """copy() preserves the name.""" |
| r = Retrying(name="my_block") |
| assert str(r.copy()) == "my_block" |
| |
| def test_str_overridden_by_copy(self) -> None: |
| """copy() allows overriding the name.""" |
| r = Retrying(name="original") |
| assert str(r.copy(name="overridden")) == "overridden" |
| |
| def test_get_fn_name_decorator(self) -> None: |
| """get_fn_name() returns the function's qualified name when used as decorator.""" |
| captured: list[RetryCallState] = [] |
| |
| @tenacity.retry( |
| stop=tenacity.stop_after_attempt(1), |
| after=lambda rs: captured.append(rs), |
| ) |
| def my_func() -> None: |
| raise ValueError |
| |
| with contextlib.suppress(Exception): |
| my_func() |
| assert captured |
| assert "my_func" in captured[0].get_fn_name() |
| |
| def test_get_fn_name_context_manager_no_name(self) -> None: |
| """get_fn_name() returns '<unknown>' in context manager mode without a name.""" |
| r = Retrying(stop=tenacity.stop_after_attempt(1)) |
| rs = RetryCallState(r, None, (), {}) |
| assert rs.get_fn_name() == "<unknown>" |
| |
| def test_get_fn_name_context_manager_with_name(self) -> None: |
| """get_fn_name() returns the given name in context manager mode.""" |
| r = Retrying(name="ws_listener", stop=tenacity.stop_after_attempt(1)) |
| rs = RetryCallState(r, None, (), {}) |
| assert rs.get_fn_name() == "ws_listener" |
| |
| def test_logging_uses_name(self) -> None: |
| """before_log uses the name parameter in context manager mode.""" |
| import unittest.mock |
| |
| log = unittest.mock.MagicMock() |
| logger = unittest.mock.MagicMock(log=log) |
| |
| with contextlib.suppress(Exception): |
| for attempt in Retrying( |
| name="my_block", |
| before=tenacity.before_log(logger, logging.INFO), |
| stop=tenacity.stop_after_attempt(1), |
| ): |
| with attempt: |
| raise ValueError |
| |
| args = log.call_args[0] |
| assert "my_block" in args[1] |
| |
| |
| class TestStopConditions(unittest.TestCase): |
| def test_never_stop(self) -> None: |
| r = Retrying() |
| self.assertFalse(r.stop(make_retry_state(3, 6546))) |
| |
| def test_stop_any(self) -> None: |
| stop = tenacity.stop_any( |
| tenacity.stop_after_delay(1), tenacity.stop_after_attempt(4) |
| ) |
| |
| def s(*args: typing.Any) -> bool: |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertTrue(s(4, 0.8)) |
| self.assertTrue(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_all(self) -> None: |
| stop = tenacity.stop_all( |
| tenacity.stop_after_delay(1), tenacity.stop_after_attempt(4) |
| ) |
| |
| def s(*args: typing.Any) -> bool: |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertFalse(s(4, 0.8)) |
| self.assertFalse(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_or(self) -> None: |
| stop = tenacity.stop_after_delay(1) | tenacity.stop_after_attempt(4) |
| |
| def s(*args: typing.Any) -> bool: |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertTrue(s(4, 0.8)) |
| self.assertTrue(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_and(self) -> None: |
| stop = tenacity.stop_after_delay(1) & tenacity.stop_after_attempt(4) |
| |
| def s(*args: typing.Any) -> bool: |
| return stop(make_retry_state(*args)) |
| |
| self.assertFalse(s(1, 0.1)) |
| self.assertFalse(s(2, 0.2)) |
| self.assertFalse(s(2, 0.8)) |
| self.assertFalse(s(4, 0.8)) |
| self.assertFalse(s(3, 1.8)) |
| self.assertTrue(s(4, 1.8)) |
| |
| def test_stop_after_attempt(self) -> None: |
| r = Retrying(stop=tenacity.stop_after_attempt(3)) |
| self.assertFalse(r.stop(make_retry_state(2, 6546))) |
| self.assertTrue(r.stop(make_retry_state(3, 6546))) |
| self.assertTrue(r.stop(make_retry_state(4, 6546))) |
| |
| def test_stop_after_delay(self) -> None: |
| for delay in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(stop=tenacity.stop_after_delay(delay)) |
| self.assertFalse(r.stop(make_retry_state(2, 0.999))) |
| self.assertTrue(r.stop(make_retry_state(2, 1))) |
| self.assertTrue(r.stop(make_retry_state(2, 1.001))) |
| |
| def test_stop_before_delay(self) -> None: |
| for delay in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(stop=tenacity.stop_before_delay(delay)) |
| self.assertFalse( |
| r.stop(make_retry_state(2, 0.999, upcoming_sleep=0.0001)) |
| ) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=0.001))) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=1))) |
| |
| # It should act the same as stop_after_delay if upcoming sleep is 0 |
| self.assertFalse(r.stop(make_retry_state(2, 0.999, upcoming_sleep=0))) |
| self.assertTrue(r.stop(make_retry_state(2, 1, upcoming_sleep=0))) |
| self.assertTrue(r.stop(make_retry_state(2, 1.001, upcoming_sleep=0))) |
| |
| def test_legacy_explicit_stop_type(self) -> None: |
| Retrying(stop="stop_after_attempt") # type: ignore[arg-type] |
| |
| def test_stop_func_with_retry_state(self) -> None: |
| def stop_func(retry_state: RetryCallState) -> bool: |
| rs = retry_state |
| return rs.attempt_number == rs.seconds_since_start |
| |
| r = Retrying(stop=stop_func) |
| self.assertFalse(r.stop(make_retry_state(1, 3))) |
| self.assertFalse(r.stop(make_retry_state(100, 99))) |
| self.assertTrue(r.stop(make_retry_state(101, 101))) |
| |
| |
| class TestWaitConditions(unittest.TestCase): |
| def test_no_sleep(self) -> None: |
| r = Retrying() |
| self.assertEqual(0, r.wait(make_retry_state(18, 9879))) |
| |
| def test_fixed_sleep(self) -> None: |
| for wait in (1, datetime.timedelta(seconds=1)): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_fixed(wait)) |
| self.assertEqual(1, r.wait(make_retry_state(12, 6546))) |
| |
| def test_incrementing_sleep(self) -> None: |
| for start, increment in ( |
| (500, 100), |
| (datetime.timedelta(seconds=500), datetime.timedelta(seconds=100)), |
| ): |
| with self.subTest(): |
| r = Retrying( |
| wait=tenacity.wait_incrementing(start=start, increment=increment) |
| ) |
| self.assertEqual(500, r.wait(make_retry_state(1, 6546))) |
| self.assertEqual(600, r.wait(make_retry_state(2, 6546))) |
| self.assertEqual(700, r.wait(make_retry_state(3, 6546))) |
| |
| def test_random_sleep(self) -> None: |
| for min_, max_ in ( |
| (1, 20), |
| (datetime.timedelta(seconds=1), datetime.timedelta(seconds=20)), |
| ): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_random(min=min_, max=max_)) |
| times = set() |
| for _ in range(1000): |
| times.add(r.wait(make_retry_state(1, 6546))) |
| |
| # this is kind of non-deterministic... |
| self.assertTrue(len(times) > 1) |
| for t in times: |
| self.assertTrue(t >= 1) |
| self.assertTrue(t < 20) |
| |
| def test_random_sleep_withoutmin_(self) -> None: |
| r = Retrying(wait=tenacity.wait_random(max=2)) |
| times = set() |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| times.add(r.wait(make_retry_state(1, 6546))) |
| |
| # this is kind of non-deterministic... |
| self.assertTrue(len(times) > 1) |
| for t in times: |
| self.assertTrue(t >= 0) |
| self.assertTrue(t <= 2) |
| |
| def test_exponential(self) -> None: |
| r = Retrying(wait=tenacity.wait_exponential()) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 128) |
| |
| def test_exponential_with_max_wait(self) -> None: |
| r = Retrying(wait=tenacity.wait_exponential(max=40)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 40) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 40) |
| self.assertEqual(r.wait(make_retry_state(50, 0)), 40) |
| |
| def test_exponential_with_min_wait(self) -> None: |
| r = Retrying(wait=tenacity.wait_exponential(min=20)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 128) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 524288) |
| |
| def test_exponential_with_max_wait_and_multiplier(self) -> None: |
| r = Retrying(wait=tenacity.wait_exponential(max=50, multiplier=1)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 1) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 2) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 4) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 8) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 50) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 50) |
| self.assertEqual(r.wait(make_retry_state(50, 0)), 50) |
| |
| def test_exponential_with_min_wait_and_multiplier(self) -> None: |
| r = Retrying(wait=tenacity.wait_exponential(min=20, multiplier=2)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 20) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 128) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 256) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 1048576) |
| |
| def test_exponential_with_min_wait_andmax__wait(self) -> None: |
| for min_, max_ in ( |
| (10, 100), |
| (datetime.timedelta(seconds=10), datetime.timedelta(seconds=100)), |
| ): |
| with self.subTest(): |
| r = Retrying(wait=tenacity.wait_exponential(min=min_, max=max_)) |
| self.assertEqual(r.wait(make_retry_state(1, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(2, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(3, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(4, 0)), 10) |
| self.assertEqual(r.wait(make_retry_state(5, 0)), 16) |
| self.assertEqual(r.wait(make_retry_state(6, 0)), 32) |
| self.assertEqual(r.wait(make_retry_state(7, 0)), 64) |
| self.assertEqual(r.wait(make_retry_state(8, 0)), 100) |
| self.assertEqual(r.wait(make_retry_state(9, 0)), 100) |
| self.assertEqual(r.wait(make_retry_state(20, 0)), 100) |
| |
| def test_legacy_explicit_wait_type(self) -> None: |
| Retrying(wait="exponential_sleep") # type: ignore[arg-type] |
| |
| def test_wait_func(self) -> None: |
| def wait_func(retry_state: RetryCallState) -> typing.Any: |
| return retry_state.attempt_number * retry_state.seconds_since_start # type: ignore[operator] |
| |
| r = Retrying(wait=wait_func) |
| self.assertEqual(r.wait(make_retry_state(1, 5)), 5) |
| self.assertEqual(r.wait(make_retry_state(2, 11)), 22) |
| self.assertEqual(r.wait(make_retry_state(10, 100)), 1000) |
| |
| def test_wait_combine(self) -> None: |
| r = Retrying( |
| wait=tenacity.wait_combine( |
| tenacity.wait_random(0, 3), tenacity.wait_fixed(5) |
| ) |
| ) |
| # Test it a few time since it's random |
| for _i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 8) |
| self.assertGreaterEqual(w, 5) |
| |
| def test_wait_exception(self) -> None: |
| def predicate(exc: BaseException) -> float: |
| if isinstance(exc, ValueError): |
| return 3.5 |
| return 10.0 |
| |
| r = Retrying(wait=tenacity.wait_exception(predicate)) |
| |
| fut1 = tenacity.Future.construct(1, ValueError(), True) |
| self.assertEqual(r.wait(make_retry_state(1, 0, last_result=fut1)), 3.5) |
| |
| fut2 = tenacity.Future.construct(1, KeyError(), True) |
| self.assertEqual(r.wait(make_retry_state(1, 0, last_result=fut2)), 10.0) |
| |
| fut3 = tenacity.Future.construct(1, None, False) |
| with self.assertRaises(RuntimeError): |
| r.wait(make_retry_state(1, 0, last_result=fut3)) |
| |
| def test_wait_double_sum(self) -> None: |
| r = Retrying(wait=tenacity.wait_random(0, 3) + tenacity.wait_fixed(5)) |
| # Test it a few time since it's random |
| for _i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 8) |
| self.assertGreaterEqual(w, 5) |
| |
| def test_wait_triple_sum(self) -> None: |
| r = Retrying( |
| wait=tenacity.wait_fixed(1) |
| + tenacity.wait_random(0, 3) |
| + tenacity.wait_fixed(5) |
| ) |
| # Test it a few time since it's random |
| for _i in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 9) |
| self.assertGreaterEqual(w, 6) |
| |
| def test_wait_arbitrary_sum(self) -> None: |
| r = Retrying( |
| wait=sum( # type: ignore[arg-type] |
| [ |
| tenacity.wait_fixed(1), # type: ignore[list-item] |
| tenacity.wait_random(0, 3), # type: ignore[list-item] |
| tenacity.wait_fixed(5), # type: ignore[list-item] |
| tenacity.wait_none(), # type: ignore[list-item] |
| ] |
| ) |
| ) |
| # Test it a few time since it's random |
| for _ in range(1000): |
| w = r.wait(make_retry_state(1, 5)) |
| self.assertLess(w, 9) |
| self.assertGreaterEqual(w, 6) |
| |
| def _assert_range(self, wait: float, min_: float, max_: float) -> None: |
| self.assertLess(wait, max_) |
| self.assertGreaterEqual(wait, min_) |
| |
| def _assert_inclusive_range(self, wait: float, low: float, high: float) -> None: |
| self.assertLessEqual(wait, high) |
| self.assertGreaterEqual(wait, low) |
| |
| def _assert_inclusive_epsilon( |
| self, wait: float, target: float, epsilon: float |
| ) -> None: |
| self.assertLessEqual(wait, target + epsilon) |
| self.assertGreaterEqual(wait, target - epsilon) |
| |
| def test_wait_chain(self) -> None: |
| r = Retrying( |
| wait=tenacity.wait_chain( |
| *[tenacity.wait_fixed(1) for i in range(2)] |
| + [tenacity.wait_fixed(4) for i in range(2)] |
| + [tenacity.wait_fixed(8) for i in range(1)] |
| ) |
| ) |
| |
| for i in range(10): |
| w = r.wait(make_retry_state(i + 1, 1)) |
| if i < 2: |
| self._assert_range(w, 1, 2) |
| elif i < 4: |
| self._assert_range(w, 4, 5) |
| else: |
| self._assert_range(w, 8, 9) |
| |
| def test_wait_chain_multiple_invocations(self) -> None: |
| sleep_intervals: list[float] = [] |
| r = Retrying( |
| sleep=sleep_intervals.append, |
| wait=tenacity.wait_chain(*[tenacity.wait_fixed(i + 1) for i in range(3)]), |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_if_result(lambda x: x == 1), |
| ) |
| |
| @r.wraps |
| def always_return_1() -> int: |
| return 1 |
| |
| self.assertRaises(tenacity.RetryError, always_return_1) |
| self.assertEqual(sleep_intervals, [1.0, 2.0, 3.0, 3.0]) |
| sleep_intervals[:] = [] |
| |
| # Clear and restart retrying. |
| self.assertRaises(tenacity.RetryError, always_return_1) |
| self.assertEqual(sleep_intervals, [1.0, 2.0, 3.0, 3.0]) |
| sleep_intervals[:] = [] |
| |
| def test_wait_random_exponential(self) -> None: |
| fn = tenacity.wait_random_exponential(0.5, 60.0) |
| |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 0, 0.5) |
| self._assert_inclusive_range(fn(make_retry_state(2, 0)), 0, 1.0) |
| self._assert_inclusive_range(fn(make_retry_state(3, 0)), 0, 2.0) |
| self._assert_inclusive_range(fn(make_retry_state(4, 0)), 0, 4.0) |
| self._assert_inclusive_range(fn(make_retry_state(5, 0)), 0, 8.0) |
| self._assert_inclusive_range(fn(make_retry_state(6, 0)), 0, 16.0) |
| self._assert_inclusive_range(fn(make_retry_state(7, 0)), 0, 32.0) |
| self._assert_inclusive_range(fn(make_retry_state(8, 0)), 0, 60.0) |
| self._assert_inclusive_range(fn(make_retry_state(9, 0)), 0, 60.0) |
| |
| # max wait |
| max_wait = 5 |
| fn = tenacity.wait_random_exponential(10, max_wait) |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 0.00, max_wait) |
| |
| # min wait |
| min_wait = 5 |
| fn = tenacity.wait_random_exponential(min=min_wait) |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), min_wait, 5) |
| |
| # Default arguments exist |
| fn = tenacity.wait_random_exponential() |
| fn(make_retry_state(0, 0)) |
| |
| def test_wait_random_exponential_statistically(self) -> None: |
| fn = tenacity.wait_random_exponential(0.5, 60.0) |
| |
| attempt = [[fn(make_retry_state(i, 0)) for _ in range(4000)] for i in range(10)] |
| |
| def mean(lst: list[float]) -> float: |
| return float(sum(lst)) / float(len(lst)) |
| |
| # skipping attempt 0 |
| self._assert_inclusive_epsilon(mean(attempt[1]), 0.25, 0.02) |
| self._assert_inclusive_epsilon(mean(attempt[2]), 0.50, 0.04) |
| self._assert_inclusive_epsilon(mean(attempt[3]), 1, 0.08) |
| self._assert_inclusive_epsilon(mean(attempt[4]), 2, 0.16) |
| self._assert_inclusive_epsilon(mean(attempt[5]), 4, 0.32) |
| self._assert_inclusive_epsilon(mean(attempt[6]), 8, 0.64) |
| self._assert_inclusive_epsilon(mean(attempt[7]), 16, 1.28) |
| self._assert_inclusive_epsilon(mean(attempt[8]), 30, 2.56) |
| self._assert_inclusive_epsilon(mean(attempt[9]), 30, 2.56) |
| |
| def test_wait_exponential_jitter(self) -> None: |
| fn = tenacity.wait_exponential_jitter(max=60) |
| |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 1, 2) |
| self._assert_inclusive_range(fn(make_retry_state(2, 0)), 2, 3) |
| self._assert_inclusive_range(fn(make_retry_state(3, 0)), 4, 5) |
| self._assert_inclusive_range(fn(make_retry_state(4, 0)), 8, 9) |
| self._assert_inclusive_range(fn(make_retry_state(5, 0)), 16, 17) |
| self._assert_inclusive_range(fn(make_retry_state(6, 0)), 32, 33) |
| self.assertEqual(fn(make_retry_state(7, 0)), 60) |
| self.assertEqual(fn(make_retry_state(8, 0)), 60) |
| self.assertEqual(fn(make_retry_state(9, 0)), 60) |
| |
| with self.assertWarns(DeprecationWarning): |
| fn = tenacity.wait_exponential_jitter(10, 5) |
| for _ in range(1000): |
| self.assertEqual(fn(make_retry_state(1, 0)), 5) |
| |
| # Default arguments exist |
| fn = tenacity.wait_exponential_jitter() |
| fn(make_retry_state(0, 0)) |
| |
| def test_wait_exponential_jitter_min(self) -> None: |
| fn = tenacity.wait_exponential_jitter(initial=1, max=60, jitter=1, min=5) |
| for _ in range(1000): |
| # Even for attempt 1 (base wait=1 + jitter 0..1 = 1..2), min=5 applies |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 5, 5) |
| self._assert_inclusive_range(fn(make_retry_state(2, 0)), 5, 5) |
| self._assert_inclusive_range(fn(make_retry_state(3, 0)), 5, 5) |
| # For attempt 4, base wait=8 + jitter 0..1 = 8..9, above min |
| self._assert_inclusive_range(fn(make_retry_state(4, 0)), 8, 9) |
| |
| def test_wait_exponential_jitter_timedelta(self) -> None: |
| from datetime import timedelta |
| |
| fn = tenacity.wait_exponential_jitter( |
| max=timedelta(seconds=60), |
| jitter=timedelta(seconds=1), |
| min=timedelta(seconds=5), |
| ) |
| for _ in range(1000): |
| self._assert_inclusive_range(fn(make_retry_state(1, 0)), 5, 5) |
| self._assert_inclusive_range(fn(make_retry_state(5, 0)), 16, 17) |
| self.assertEqual(fn(make_retry_state(7, 0)), 60) |
| |
| def test_wait_exponential_jitter_multiplier(self) -> None: |
| fn = tenacity.wait_exponential_jitter(multiplier=10, max=60, jitter=0) |
| self.assertEqual(fn(make_retry_state(1, 0)), 10) |
| self.assertEqual(fn(make_retry_state(2, 0)), 20) |
| self.assertEqual(fn(make_retry_state(3, 0)), 40) |
| self.assertEqual(fn(make_retry_state(4, 0)), 60) |
| |
| def test_wait_exponential_jitter_initial_deprecated(self) -> None: |
| with self.assertWarns(DeprecationWarning): |
| fn = tenacity.wait_exponential_jitter(initial=10, max=60, jitter=0) |
| self.assertEqual(fn(make_retry_state(1, 0)), 10) |
| self.assertEqual(fn(make_retry_state(2, 0)), 20) |
| |
| def test_wait_exponential_jitter_initial_and_multiplier_raises(self) -> None: |
| with self.assertRaises(ValueError): |
| tenacity.wait_exponential_jitter(initial=5, multiplier=10) |
| |
| def test_wait_retry_state_attributes(self) -> None: |
| class ExtractCallState(Exception): |
| pass |
| |
| # retry_state is mutable, so return it as an exception to extract the |
| # exact values it has when wait is called and bypass any other logic. |
| def waitfunc(retry_state: RetryCallState) -> float: |
| raise ExtractCallState(retry_state) |
| |
| retrying = Retrying( |
| wait=waitfunc, |
| retry=( |
| tenacity.retry_if_exception_type() |
| | tenacity.retry_if_result(lambda result: result == 123) |
| ), |
| ) |
| |
| def returnval() -> int: |
| return 123 |
| |
| try: |
| retrying(returnval) |
| except ExtractCallState as err: |
| retry_state = err.args[0] |
| self.assertIs(retry_state.fn, returnval) |
| self.assertEqual(retry_state.args, ()) |
| self.assertEqual(retry_state.kwargs, {}) |
| self.assertEqual(retry_state.outcome.result(), 123) |
| self.assertEqual(retry_state.attempt_number, 1) |
| self.assertGreaterEqual(retry_state.outcome_timestamp, retry_state.start_time) |
| |
| def dying() -> None: |
| raise Exception("Broken") |
| |
| try: |
| retrying(dying) |
| except ExtractCallState as err: |
| retry_state = err.args[0] |
| self.assertIs(retry_state.fn, dying) |
| self.assertEqual(retry_state.args, ()) |
| self.assertEqual(retry_state.kwargs, {}) |
| self.assertEqual(str(retry_state.outcome.exception()), "Broken") |
| self.assertEqual(retry_state.attempt_number, 1) |
| self.assertGreaterEqual(retry_state.outcome_timestamp, retry_state.start_time) |
| |
| |
| class TestRetryConditions(unittest.TestCase): |
| def test_retry_if_result(self) -> None: |
| retry = tenacity.retry_if_result(lambda x: x == 1) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| |
| def test_retry_if_not_result(self) -> None: |
| retry = tenacity.retry_if_not_result(lambda x: x == 1) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, False))) |
| |
| def test_retry_any(self) -> None: |
| retry = tenacity.retry_any( |
| tenacity.retry_if_result(lambda x: x == 1), |
| tenacity.retry_if_result(lambda x: x == 2), |
| ) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertTrue(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_all(self) -> None: |
| retry = tenacity.retry_all( |
| tenacity.retry_if_result(lambda x: x == 1), |
| tenacity.retry_if_result(lambda x: isinstance(x, int)), |
| ) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_and(self) -> None: |
| retry = tenacity.retry_if_result(lambda x: x == 1) & tenacity.retry_if_result( |
| lambda x: isinstance(x, int) |
| ) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, 1, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 3, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 1, True))) |
| |
| def test_retry_or(self) -> None: |
| retry = tenacity.retry_if_result( |
| lambda x: x == "foo" |
| ) | tenacity.retry_if_result(lambda x: isinstance(x, int)) |
| |
| def r(fut: tenacity.Future) -> bool: |
| retry_state = make_retry_state(1, 1.0, last_result=fut) |
| return retry(retry_state) |
| |
| self.assertTrue(r(tenacity.Future.construct(1, "foo", False))) |
| self.assertFalse(r(tenacity.Future.construct(1, "foobar", False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 2.2, False))) |
| self.assertFalse(r(tenacity.Future.construct(1, 42, True))) |
| |
| def test_retry_or_with_plain_function(self) -> None: |
| """Plain callables can be composed with retry_base via |.""" |
| |
| def my_retry(retry_state: tenacity.RetryCallState) -> bool: |
| return retry_state.outcome is not None and not retry_state.outcome.failed |
| |
| # retry_base | plain_callable (exercises __or__ fallback) |
| retry = tenacity.retry_if_exception_type(Exception) | my_retry |
| retry_state = make_retry_state( |
| 1, 1.0, last_result=tenacity.Future.construct(1, "ok", False) |
| ) |
| self.assertTrue(retry(retry_state)) |
| |
| # plain_callable | retry_base (exercises __ror__ via reflection) |
| retry2 = my_retry | tenacity.retry_if_exception_type(Exception) |
| self.assertTrue(retry2(retry_state)) |
| |
| def test_retry_and_with_plain_function(self) -> None: |
| """Plain callables can be composed with retry_base via &.""" |
| |
| def my_retry(retry_state: tenacity.RetryCallState) -> bool: |
| return True |
| |
| # retry_base & plain_callable (exercises __and__ fallback) |
| retry = tenacity.retry_if_result(lambda x: x == 1) & my_retry |
| retry_state = make_retry_state( |
| 1, 1.0, last_result=tenacity.Future.construct(1, 1, False) |
| ) |
| self.assertTrue(retry(retry_state)) |
| |
| # plain_callable & retry_base (exercises __rand__ via reflection) |
| retry2 = my_retry & tenacity.retry_if_result(lambda x: x == 1) |
| self.assertTrue(retry2(retry_state)) |
| |
| def test_retry_or_coalesces(self) -> None: |
| """Multiple | operations flatten into a single retry_any.""" |
| a = tenacity.retry_if_exception_type(IOError) |
| b = tenacity.retry_if_exception_type(OSError) |
| c = tenacity.retry_if_exception_type(ValueError) |
| |
| combined = a | b | c |
| self.assertIsInstance(combined, retry_any) |
| self.assertEqual(len(combined.retries), 3) |
| |
| def test_retry_and_coalesces(self) -> None: |
| """Multiple & operations flatten into a single retry_all.""" |
| a = tenacity.retry_if_result(lambda x: x == 1) |
| b = tenacity.retry_if_result(lambda x: x > 0) |
| c = tenacity.retry_if_result(lambda x: x < 10) |
| |
| combined = a & b & c |
| self.assertIsInstance(combined, retry_all) |
| self.assertEqual(len(combined.retries), 3) |
| |
| def _raise_try_again(self) -> None: |
| self._attempts += 1 |
| if self._attempts < 3: |
| raise tenacity.TryAgain |
| |
| def test_retry_try_again(self) -> None: |
| self._attempts = 0 |
| Retrying(stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_never)( |
| self._raise_try_again |
| ) |
| self.assertEqual(3, self._attempts) |
| |
| def test_retry_try_again_forever(self) -> None: |
| def _r() -> None: |
| raise tenacity.TryAgain |
| |
| r = Retrying(stop=tenacity.stop_after_attempt(5), retry=tenacity.retry_never) |
| self.assertRaises(tenacity.RetryError, r, _r) |
| self.assertEqual(5, r.statistics["attempt_number"]) |
| |
| def test_retry_try_again_forever_reraise(self) -> None: |
| def _r() -> None: |
| raise tenacity.TryAgain |
| |
| r = Retrying( |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_never, |
| reraise=True, |
| ) |
| self.assertRaises(tenacity.TryAgain, r, _r) |
| self.assertEqual(5, r.statistics["attempt_number"]) |
| |
| def test_retry_if_exception_message_negative_no_inputs(self) -> None: |
| with self.assertRaises(TypeError): |
| tenacity.retry_if_exception_message() |
| |
| def test_retry_if_exception_message_negative_too_many_inputs(self) -> None: |
| with self.assertRaises(TypeError): |
| tenacity.retry_if_exception_message(message="negative", match="negative") |
| |
| |
| class NoneReturnUntilAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Return None until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return None |
| return True |
| |
| |
| class NoIOErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Raise an IOError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise OSError("Hi there, I'm an IOError") |
| return True |
| |
| |
| class NoNameErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Raise a NameError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise NameError("Hi there, I'm a NameError") |
| return True |
| |
| |
| class NoNameErrorCauseAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go2(self) -> typing.Any: |
| raise NameError("Hi there, I'm a NameError") |
| |
| def go(self) -> typing.Any: |
| """Raise an IOError with a NameError as cause until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| try: |
| self.go2() |
| except NameError as e: |
| raise OSError from e |
| |
| return True |
| |
| |
| class NoIOErrorCauseAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go2(self) -> typing.Any: |
| raise OSError("Hi there, I'm an IOError") |
| |
| def go(self) -> typing.Any: |
| """Raise a NameError with an IOError as cause until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| try: |
| self.go2() |
| except OSError as e: |
| raise NameError from e |
| |
| return True |
| |
| |
| class NameErrorUntilCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| derived_message = "Hi there, I'm a NameError" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Return True until after count threshold has been crossed. |
| |
| Then raise a NameError. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return True |
| raise NameError(self.derived_message) |
| |
| |
| class IOErrorUntilCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Return True until after count threshold has been crossed. |
| |
| Then raise an IOError. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| return True |
| raise OSError("Hi there, I'm an IOError") |
| |
| |
| class CustomError(Exception): |
| """This is a custom exception class. |
| |
| Note that For Python 2.x, we don't strictly need to extend BaseException, |
| however, Python 3.x will complain. While this test suite won't run |
| correctly under Python 3.x without extending from the Python exception |
| hierarchy, the actual module code is backwards compatible Python 2.x and |
| will allow for cases where exception classes don't extend from the |
| hierarchy. |
| """ |
| |
| def __init__(self, value: str) -> None: |
| self.value = value |
| |
| def __str__(self) -> str: |
| return self.value |
| |
| |
| class NoCustomErrorAfterCount: |
| """Holds counter state for invoking a method several times in a row.""" |
| |
| derived_message = "This is a Custom exception class" |
| |
| def __init__(self, count: int) -> None: |
| self.counter = 0 |
| self.count = count |
| |
| def go(self) -> typing.Any: |
| """Raise a CustomError until after count threshold has been crossed. |
| |
| Then return True. |
| """ |
| if self.counter < self.count: |
| self.counter += 1 |
| raise CustomError(self.derived_message) |
| return True |
| |
| |
| class CapturingHandler(logging.Handler): |
| """Captures log records for inspection.""" |
| |
| def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: |
| super().__init__(*args, **kwargs) |
| self.records: list[logging.LogRecord] = [] |
| |
| def emit(self, record: logging.LogRecord) -> None: |
| self.records.append(record) |
| |
| |
| def current_time_ms() -> int: |
| return round(time.time() * 1000) |
| |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.05), |
| retry=tenacity.retry_if_result(lambda result: result is None), |
| ) |
| def _retryable_test_with_wait(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_result(lambda result: result is None), |
| ) |
| def _retryable_test_with_stop(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_cause_type(NameError)) |
| def _retryable_test_with_exception_cause_type(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_type(IOError)) |
| def _retryable_test_with_exception_type_io(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_not_exception_type(IOError)) |
| def _retryable_test_if_not_exception_type_io(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), retry=tenacity.retry_if_exception_type(IOError) |
| ) |
| def _retryable_test_with_exception_type_io_attempt_limit( |
| thing: typing.Any, |
| ) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_unless_exception_type(NameError)) |
| def _retryable_test_with_unless_exception_type_name(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_unless_exception_type(NameError), |
| ) |
| def _retryable_test_with_unless_exception_type_name_attempt_limit( |
| thing: typing.Any, |
| ) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_unless_exception_type()) |
| def _retryable_test_with_unless_exception_type_no_input( |
| thing: typing.Any, |
| ) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(5), |
| retry=tenacity.retry_if_exception_message( |
| message=NoCustomErrorAfterCount.derived_message |
| ), |
| ) |
| def _retryable_test_if_exception_message_message(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| message=NoCustomErrorAfterCount.derived_message |
| ) |
| ) |
| def _retryable_test_if_not_exception_message_message(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_exception_message( |
| match=NoCustomErrorAfterCount.derived_message[:3] + ".*" |
| ) |
| ) |
| def _retryable_test_if_exception_message_match(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| match=NoCustomErrorAfterCount.derived_message[:3] + ".*" |
| ) |
| ) |
| def _retryable_test_if_not_exception_message_match(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| retry=tenacity.retry_if_not_exception_message( |
| message=NameErrorUntilCount.derived_message |
| ) |
| ) |
| def _retryable_test_not_exception_message_delay(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry |
| def _retryable_default(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry() |
| def _retryable_default_f(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry(retry=tenacity.retry_if_exception_type(CustomError)) |
| def _retryable_test_with_exception_type_custom(thing: typing.Any) -> typing.Any: |
| return thing.go() |
| |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_exception_type(CustomError), |
| ) |
| def _retryable_test_with_exception_type_custom_attempt_limit( |
| thing: typing.Any, |
| ) -> typing.Any: |
| return thing.go() |
| |
| |
| class TestDecoratorWrapper(unittest.TestCase): |
| def test_with_wait(self) -> None: |
| start = current_time_ms() |
| result = _retryable_test_with_wait(NoneReturnUntilAfterCount(5)) |
| t = current_time_ms() - start |
| self.assertGreaterEqual(t, 250) |
| self.assertTrue(result) |
| |
| def test_with_stop_on_return_value(self) -> None: |
| try: |
| _retryable_test_with_stop(NoneReturnUntilAfterCount(5)) |
| self.fail("Expected RetryError after 3 attempts") |
| except RetryError as re: |
| self.assertFalse(re.last_attempt.failed) |
| self.assertEqual(3, re.last_attempt.attempt_number) |
| self.assertTrue(re.last_attempt.result() is None) |
| print(re) |
| |
| def test_with_stop_on_exception(self) -> None: |
| try: |
| _retryable_test_with_stop(NoIOErrorAfterCount(5)) |
| self.fail("Expected IOError") |
| except OSError as re: |
| self.assertTrue(isinstance(re, IOError)) |
| print(re) |
| |
| def test_retry_if_exception_of_type(self) -> None: |
| self.assertTrue(_retryable_test_with_exception_type_io(NoIOErrorAfterCount(5))) |
| |
| try: |
| _retryable_test_with_exception_type_io(NoNameErrorAfterCount(5)) |
| self.fail("Expected NameError") |
| except NameError as n: |
| self.assertTrue(isinstance(n, NameError)) |
| print(n) |
| |
| self.assertTrue( |
| _retryable_test_with_exception_type_custom(NoCustomErrorAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_with_exception_type_custom(NoNameErrorAfterCount(5)) |
| self.fail("Expected NameError") |
| except NameError as n: |
| self.assertTrue(isinstance(n, NameError)) |
| print(n) |
| |
| def test_retry_except_exception_of_type(self) -> None: |
| self.assertTrue( |
| _retryable_test_if_not_exception_type_io(NoNameErrorAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_if_not_exception_type_io(NoIOErrorAfterCount(5)) |
| self.fail("Expected IOError") |
| except OSError as err: |
| self.assertTrue(isinstance(err, IOError)) |
| print(err) |
| |
| def test_retry_until_exception_of_type_attempt_number(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_with_unless_exception_type_name(NameErrorUntilCount(5)) |
| ) |
| except NameError as e: |
| s = _retryable_test_with_unless_exception_type_name.statistics |
| self.assertTrue(s["attempt_number"] == 6) |
| print(e) |
| else: |
| self.fail("Expected NameError") |
| |
| def test_retry_until_exception_of_type_no_type(self) -> None: |
| try: |
| # no input should catch all subclasses of Exception |
| self.assertTrue( |
| _retryable_test_with_unless_exception_type_no_input( |
| NameErrorUntilCount(5) |
| ) |
| ) |
| except NameError as e: |
| s = _retryable_test_with_unless_exception_type_no_input.statistics |
| self.assertTrue(s["attempt_number"] == 6) |
| print(e) |
| else: |
| self.fail("Expected NameError") |
| |
| def test_retry_until_exception_of_type_wrong_exception(self) -> None: |
| try: |
| # two iterations with IOError, one that returns True |
| _retryable_test_with_unless_exception_type_name_attempt_limit( |
| IOErrorUntilCount(2) |
| ) |
| self.fail("Expected RetryError") |
| except RetryError as e: |
| self.assertTrue(isinstance(e, RetryError)) |
| print(e) |
| |
| def test_retry_if_exception_message(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_if_exception_message_message(NoCustomErrorAfterCount(3)) |
| ) |
| except CustomError: |
| print(_retryable_test_if_exception_message_message.statistics) |
| self.fail("CustomError should've been retried from errormessage") |
| |
| def test_retry_if_not_exception_message(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_if_not_exception_message_message( |
| NoCustomErrorAfterCount(2) |
| ) |
| ) |
| except CustomError: |
| s = _retryable_test_if_not_exception_message_message.statistics |
| self.assertTrue(s["attempt_number"] == 1) |
| |
| def test_retry_if_not_exception_message_delay(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_not_exception_message_delay(NameErrorUntilCount(3)) |
| ) |
| except NameError: |
| s = _retryable_test_not_exception_message_delay.statistics |
| print(s["attempt_number"]) |
| self.assertTrue(s["attempt_number"] == 4) |
| |
| def test_retry_if_exception_message_match(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_if_exception_message_match(NoCustomErrorAfterCount(3)) |
| ) |
| except CustomError: |
| self.fail("CustomError should've been retried from errormessage") |
| |
| def test_retry_if_not_exception_message_match(self) -> None: |
| try: |
| self.assertTrue( |
| _retryable_test_if_not_exception_message_message( |
| NoCustomErrorAfterCount(2) |
| ) |
| ) |
| except CustomError: |
| s = _retryable_test_if_not_exception_message_message.statistics |
| self.assertTrue(s["attempt_number"] == 1) |
| |
| def test_retry_if_exception_cause_type(self) -> None: |
| self.assertTrue( |
| _retryable_test_with_exception_cause_type(NoNameErrorCauseAfterCount(5)) |
| ) |
| |
| try: |
| _retryable_test_with_exception_cause_type(NoIOErrorCauseAfterCount(5)) |
| self.fail("Expected exception without NameError as cause") |
| except NameError: |
| pass |
| |
| def test_retry_preserves_argument_defaults(self) -> None: |
| def function_with_defaults(a: int = 1) -> int: |
| return a |
| |
| def function_with_kwdefaults(*, a: int = 1) -> int: |
| return a |
| |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), stop=tenacity.stop_after_attempt(3) |
| ) |
| wrapped_defaults_function = retrying.wraps(function_with_defaults) |
| wrapped_kwdefaults_function = retrying.wraps(function_with_kwdefaults) |
| |
| self.assertEqual( |
| function_with_defaults.__defaults__, |
| wrapped_defaults_function.__defaults__, # type: ignore[attr-defined] |
| ) |
| self.assertEqual( |
| function_with_kwdefaults.__kwdefaults__, |
| wrapped_kwdefaults_function.__kwdefaults__, # type: ignore[attr-defined] |
| ) |
| |
| def test_defaults(self) -> None: |
| self.assertTrue(_retryable_default(NoNameErrorAfterCount(5))) |
| self.assertTrue(_retryable_default_f(NoNameErrorAfterCount(5))) |
| self.assertTrue(_retryable_default(NoCustomErrorAfterCount(5))) |
| self.assertTrue(_retryable_default_f(NoCustomErrorAfterCount(5))) |
| |
| def test_retry_function_object(self) -> None: |
| """Test that functools.wraps doesn't cause problems with callable objects. |
| |
| It raises an error upon trying to wrap it in Py2, because __name__ |
| attribute is missing. It's fixed in Py3 but was never backported. |
| """ |
| |
| class Hello: |
| def __call__(self) -> str: |
| return "Hello" |
| |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), stop=tenacity.stop_after_attempt(3) |
| ) |
| h = retrying.wraps(Hello()) |
| self.assertEqual(h(), "Hello") |
| |
| def test_retry_function_attributes(self) -> None: |
| """Test that the wrapped function attributes are exposed as intended. |
| |
| - statistics contains the value for the latest function run |
| - retry object can be modified to change its behaviour (useful to patch in tests) |
| - retry object statistics are synced with function statistics |
| """ |
| |
| self.assertTrue(_retryable_test_with_stop(NoneReturnUntilAfterCount(2))) |
| |
| expected_stats = { |
| "attempt_number": 3, |
| "delay_since_first_attempt": mock.ANY, |
| "idle_for": mock.ANY, |
| "start_time": mock.ANY, |
| } |
| self.assertEqual(_retryable_test_with_stop.statistics, expected_stats) |
| self.assertEqual(_retryable_test_with_stop.retry.statistics, expected_stats) |
| |
| with mock.patch.object( |
| _retryable_test_with_stop.retry, |
| "stop", |
| tenacity.stop_after_attempt(1), |
| ): |
| try: |
| self.assertTrue(_retryable_test_with_stop(NoneReturnUntilAfterCount(2))) |
| except RetryError as exc: |
| expected_stats = { |
| "attempt_number": 1, |
| "delay_since_first_attempt": mock.ANY, |
| "idle_for": mock.ANY, |
| "start_time": mock.ANY, |
| } |
| self.assertEqual(_retryable_test_with_stop.statistics, expected_stats) |
| self.assertEqual(exc.last_attempt.attempt_number, 1) |
| self.assertEqual( |
| _retryable_test_with_stop.retry.statistics, expected_stats |
| ) |
| else: |
| self.fail("RetryError should have been raised after 1 attempt") |
| |
| |
| class TestStatisticsKeys: |
| def test_delay_since_first_attempt_available_on_first_attempt(self) -> None: |
| """delay_since_first_attempt should be in statistics from the start.""" |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_result(lambda x: x is None), |
| ) |
| def succeeds_first_try() -> bool: |
| assert "delay_since_first_attempt" in succeeds_first_try.statistics |
| assert succeeds_first_try.statistics["delay_since_first_attempt"] == 0 |
| return True |
| |
| succeeds_first_try() |
| assert succeeds_first_try.statistics["delay_since_first_attempt"] == 0 |
| |
| |
| class TestEnabled: |
| def test_enabled_false_skips_retry(self) -> None: |
| """When enabled=False, the function is called directly without retrying.""" |
| call_count = 0 |
| |
| @retry(enabled=False, stop=tenacity.stop_after_attempt(3)) |
| def always_fails() -> None: |
| nonlocal call_count |
| call_count += 1 |
| raise ValueError("fail") |
| |
| with pytest.raises(ValueError, match="fail"): |
| always_fails() |
| assert call_count == 1 |
| |
| def test_enabled_false_preserves_attributes(self) -> None: |
| """When enabled=False, .retry, .retry_with, .statistics are still available.""" |
| |
| @retry(enabled=False, stop=tenacity.stop_after_attempt(3)) |
| def my_func() -> str: |
| return "ok" |
| |
| assert hasattr(my_func, "retry") |
| assert hasattr(my_func, "retry_with") |
| assert hasattr(my_func, "statistics") |
| assert my_func() == "ok" |
| |
| def test_enabled_false_via_retry_with(self) -> None: |
| """retry_with(enabled=False) disables retrying.""" |
| call_count = 0 |
| |
| @retry(stop=tenacity.stop_after_attempt(3)) |
| def always_fails() -> None: |
| nonlocal call_count |
| call_count += 1 |
| raise ValueError("fail") |
| |
| disabled = always_fails.retry_with(enabled=False) |
| with pytest.raises(ValueError, match="fail"): |
| disabled() |
| assert call_count == 1 |
| |
| def test_enabled_true_retries_normally(self) -> None: |
| """When enabled=True (default), retrying works as usual.""" |
| call_count = 0 |
| |
| @retry(enabled=True, stop=tenacity.stop_after_attempt(3), reraise=True) |
| def fails_twice() -> bool: |
| nonlocal call_count |
| call_count += 1 |
| if call_count < 3: |
| raise ValueError("fail") |
| return True |
| |
| assert fails_twice() is True |
| assert call_count == 3 |
| |
| |
| class TestRetryWith: |
| def test_redefine_wait(self) -> None: |
| start = current_time_ms() |
| result = _retryable_test_with_wait.retry_with(wait=tenacity.wait_fixed(0.1))( |
| NoneReturnUntilAfterCount(5) |
| ) |
| t = current_time_ms() - start |
| assert t >= 500 |
| assert result is True |
| |
| def test_redefine_stop(self) -> None: |
| result = _retryable_test_with_stop.retry_with( |
| stop=tenacity.stop_after_attempt(5) |
| )(NoneReturnUntilAfterCount(4)) |
| assert result is True |
| |
| def test_retry_error_cls_should_be_preserved(self) -> None: |
| @retry(stop=tenacity.stop_after_attempt(10), retry_error_cls=ValueError) # type: ignore[arg-type] |
| def _retryable() -> None: |
| raise Exception("raised for test purposes") |
| |
| with pytest.raises(Exception) as exc_ctx: |
| _retryable.retry_with(stop=tenacity.stop_after_attempt(2))() |
| |
| assert exc_ctx.type is ValueError, "Should remap to specific exception type" |
| |
| def test_retry_error_callback_should_be_preserved(self) -> None: |
| def return_text(retry_state: RetryCallState) -> str: |
| return f"Calling {retry_state.fn.__name__} keeps raising errors after {retry_state.attempt_number} attempts" # type: ignore[union-attr] |
| |
| @retry(stop=tenacity.stop_after_attempt(10), retry_error_callback=return_text) |
| def _retryable() -> None: |
| raise Exception("raised for test purposes") |
| |
| result = _retryable.retry_with(stop=tenacity.stop_after_attempt(5))() |
| assert result == "Calling _retryable keeps raising errors after 5 attempts" |
| |
| |
| class TestBeforeAfterAttempts(unittest.TestCase): |
| _attempt_number = 0 |
| |
| def test_before_attempts(self) -> None: |
| TestBeforeAfterAttempts._attempt_number = 0 |
| |
| def _before(retry_state: RetryCallState) -> None: |
| TestBeforeAfterAttempts._attempt_number = retry_state.attempt_number |
| |
| @retry( |
| wait=tenacity.wait_fixed(1), |
| stop=tenacity.stop_after_attempt(1), |
| before=_before, |
| ) |
| def _test_before() -> None: |
| pass |
| |
| _test_before() |
| |
| self.assertTrue(TestBeforeAfterAttempts._attempt_number == 1) |
| |
| def test_after_attempts(self) -> None: |
| TestBeforeAfterAttempts._attempt_number = 0 |
| |
| def _after(retry_state: RetryCallState) -> None: |
| TestBeforeAfterAttempts._attempt_number = retry_state.attempt_number |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(3), |
| after=_after, |
| ) |
| def _test_after() -> None: |
| if TestBeforeAfterAttempts._attempt_number < 2: |
| raise Exception("testing after_attempts handler") |
| |
| _test_after() |
| |
| self.assertTrue(TestBeforeAfterAttempts._attempt_number == 2) |
| |
| def test_before_sleep(self) -> None: |
| def _before_sleep(retry_state: RetryCallState) -> None: |
| self.assertGreater(retry_state.next_action.sleep, 0) # type: ignore[union-attr] |
| _before_sleep.attempt_number = retry_state.attempt_number # type: ignore[attr-defined] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| def _test_before_sleep() -> None: |
| if _before_sleep.attempt_number < 2: # type: ignore[attr-defined] |
| raise Exception("testing before_sleep_attempts handler") |
| |
| _test_before_sleep() |
| self.assertEqual(_before_sleep.attempt_number, 2) # type: ignore[attr-defined] |
| |
| def _before_sleep_log_raises( |
| self, get_call_fn: typing.Callable[..., typing.Any] |
| ) -> None: |
| thing = NoIOErrorAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log(logger, logging.INFO) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| get_call_fn(retrying)(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = ( |
| r"^Retrying .* in 0\.01 seconds as it raised " |
| r"(IO|OS)Error: Hi there, I'm an IOError\.$" |
| ) |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_raises(self) -> None: |
| self._before_sleep_log_raises(lambda x: x) |
| |
| def test_before_sleep_log_raises_with_exc_info(self) -> None: |
| thing = NoIOErrorAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log( |
| logger, logging.INFO, exc_info=True |
| ) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| before_sleep=_before_sleep, |
| ) |
| retrying(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = re.compile( |
| r"^Retrying .* in 0\.01 seconds as it raised " |
| r"(IO|OS)Error: Hi there, I'm an IOError\.{0}" |
| r"Traceback \(most recent call last\):{0}" |
| r".*$".format("\n"), |
| flags=re.MULTILINE, |
| ) |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_returns(self, exc_info: bool = False) -> None: |
| thing = NoneReturnUntilAfterCount(2) |
| logger = logging.getLogger(self.id()) |
| logger.propagate = False |
| logger.setLevel(logging.INFO) |
| handler = CapturingHandler() |
| logger.addHandler(handler) |
| try: |
| _before_sleep = tenacity.before_sleep_log( |
| logger, logging.INFO, exc_info=exc_info |
| ) |
| _retry = tenacity.retry_if_result(lambda result: result is None) |
| retrying = Retrying( |
| wait=tenacity.wait_fixed(0.01), |
| stop=tenacity.stop_after_attempt(3), |
| retry=_retry, |
| before_sleep=_before_sleep, |
| ) |
| retrying(thing.go) |
| finally: |
| logger.removeHandler(handler) |
| |
| etalon_re = r"^Retrying .* in 0\.01 seconds as it returned None\.$" |
| self.assertEqual(len(handler.records), 2) |
| fmt = logging.Formatter().format |
| self.assertRegex(fmt(handler.records[0]), etalon_re) |
| self.assertRegex(fmt(handler.records[1]), etalon_re) |
| |
| def test_before_sleep_log_returns_with_exc_info(self) -> None: |
| self.test_before_sleep_log_returns(exc_info=True) |
| |
| |
| class TestReraiseExceptions(unittest.TestCase): |
| def test_reraise_by_default(self) -> None: |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| reraise=True, |
| ) |
| def _reraised_by_default() -> None: |
| calls.append("x") |
| raise KeyError("Bad key") |
| |
| self.assertRaises(KeyError, _reraised_by_default) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_from_retry_error(self) -> None: |
| calls = [] |
| |
| @retry(wait=tenacity.wait_fixed(0.1), stop=tenacity.stop_after_attempt(2)) |
| def _raise_key_error() -> None: |
| calls.append("x") |
| raise KeyError("Bad key") |
| |
| def _reraised_key_error() -> None: |
| try: |
| _raise_key_error() |
| except tenacity.RetryError as retry_err: |
| retry_err.reraise() |
| |
| self.assertRaises(KeyError, _reraised_key_error) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_timeout_from_retry_error(self) -> None: |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| retry=lambda retry_state: True, |
| ) |
| def _mock_fn() -> None: |
| calls.append("x") |
| |
| def _reraised_mock_fn() -> None: |
| try: |
| _mock_fn() |
| except tenacity.RetryError as retry_err: |
| retry_err.reraise() |
| |
| self.assertRaises(tenacity.RetryError, _reraised_mock_fn) |
| self.assertEqual(2, len(calls)) |
| |
| def test_reraise_no_exception(self) -> None: |
| calls = [] |
| |
| @retry( |
| wait=tenacity.wait_fixed(0.1), |
| stop=tenacity.stop_after_attempt(2), |
| retry=lambda retry_state: True, |
| reraise=True, |
| ) |
| def _mock_fn() -> None: |
| calls.append("x") |
| |
| self.assertRaises(tenacity.RetryError, _mock_fn) |
| self.assertEqual(2, len(calls)) |
| |
| |
| class TestStatistics(unittest.TestCase): |
| def test_stats(self) -> None: |
| @retry() |
| def _foobar() -> int: |
| return 42 |
| |
| self.assertEqual({}, _foobar.statistics) |
| _foobar() |
| self.assertEqual(1, _foobar.statistics["attempt_number"]) |
| |
| def test_stats_failing(self) -> None: |
| @retry(stop=tenacity.stop_after_attempt(2)) |
| def _foobar() -> None: |
| raise ValueError(42) |
| |
| self.assertEqual({}, _foobar.statistics) |
| with contextlib.suppress(Exception): |
| _foobar() |
| self.assertEqual(2, _foobar.statistics["attempt_number"]) |
| |
| def test_retry_object_statistics_synced(self) -> None: |
| """Test that func.retry.statistics is synced with func.statistics.""" |
| |
| @retry(stop=tenacity.stop_after_attempt(3)) |
| def _foobar() -> int: |
| return 42 |
| |
| _foobar() |
| self.assertEqual( |
| _foobar.retry.statistics["attempt_number"], |
| _foobar.statistics["attempt_number"], |
| ) |
| |
| def test_retry_object_statistics_during_execution(self) -> None: |
| """Test that func.retry.statistics is accessible during execution.""" |
| attempts: list[int] = [] |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_exception_type(ValueError), |
| reraise=True, |
| ) |
| def _foobar() -> int: |
| attempts.append(_foobar.retry.statistics["attempt_number"]) |
| if len(attempts) < 3: |
| raise ValueError("retry") |
| return 42 |
| |
| _foobar() |
| self.assertEqual(attempts, [1, 2, 3]) |
| |
| |
| class TestRetryErrorCallback(unittest.TestCase): |
| def setUp(self) -> None: |
| self._attempt_number = 0 |
| self._callback_called = False |
| |
| def _callback(self, fut: tenacity.Future) -> tenacity.Future: |
| self._callback_called = True |
| return fut |
| |
| def test_retry_error_callback(self) -> None: |
| num_attempts = 3 |
| |
| def retry_error_callback(retry_state: RetryCallState) -> typing.Any: |
| retry_error_callback.called_times += 1 # type: ignore[attr-defined] |
| return retry_state.outcome |
| |
| retry_error_callback.called_times = 0 # type: ignore[attr-defined] |
| |
| @retry( |
| stop=tenacity.stop_after_attempt(num_attempts), |
| retry_error_callback=retry_error_callback, |
| ) |
| def _foobar() -> None: |
| self._attempt_number += 1 |
| raise Exception("This exception should not be raised") |
| |
| result = _foobar() |
| |
| self.assertEqual(retry_error_callback.called_times, 1) # type: ignore[attr-defined] |
| self.assertEqual(num_attempts, self._attempt_number) |
| self.assertIsInstance(result, tenacity.Future) |
| |
| |
| class TestContextManager(unittest.TestCase): |
| def test_context_manager_retry_one(self) -> None: |
| from tenacity import Retrying |
| |
| raise_ = True |
| |
| for attempt in Retrying(): |
| with attempt: |
| if raise_: |
| raise_ = False |
| raise Exception("Retry it!") |
| |
| def test_context_manager_on_error(self) -> None: |
| from tenacity import Retrying |
| |
| class CustomError(Exception): |
| pass |
| |
| retry = Retrying(retry=tenacity.retry_if_exception_type(IOError)) |
| |
| def test() -> None: |
| for attempt in retry: |
| with attempt: |
| raise CustomError("Don't retry!") |
| |
| self.assertRaises(CustomError, test) |
| |
| def test_context_manager_retry_error(self) -> None: |
| from tenacity import Retrying |
| |
| retry = Retrying(stop=tenacity.stop_after_attempt(2)) |
| |
| def test() -> None: |
| for attempt in retry: |
| with attempt: |
| raise Exception("Retry it!") |
| |
| self.assertRaises(RetryError, test) |
| |
| def test_context_manager_reraise(self) -> None: |
| from tenacity import Retrying |
| |
| class CustomError(Exception): |
| pass |
| |
| retry = Retrying(reraise=True, stop=tenacity.stop_after_attempt(2)) |
| |
| def test() -> None: |
| for attempt in retry: |
| with attempt: |
| raise CustomError("Don't retry!") |
| |
| self.assertRaises(CustomError, test) |
| |
| |
| class TestInvokeAsCallable: |
| """Test direct invocation of Retrying as a callable.""" |
| |
| @staticmethod |
| def invoke(retry: Retrying, f: typing.Callable[..., typing.Any]) -> typing.Any: |
| """ |
| Invoke Retrying logic. |
| |
| Wrapper allows testing different call mechanisms in test sub-classes. |
| """ |
| return retry(f) |
| |
| def test_retry_one(self) -> None: |
| def f() -> typing.Any: |
| f.calls.append(len(f.calls) + 1) # type: ignore[attr-defined] |
| if len(f.calls) <= 1: # type: ignore[attr-defined] |
| raise Exception("Retry it!") |
| return 42 |
| |
| f.calls = [] # type: ignore[attr-defined] |
| |
| retry = Retrying() |
| assert self.invoke(retry, f) == 42 |
| assert f.calls == [1, 2] # type: ignore[attr-defined] |
| |
| def test_on_error(self) -> None: |
| class CustomError(Exception): |
| pass |
| |
| def f() -> typing.Any: |
| f.calls.append(len(f.calls) + 1) # type: ignore[attr-defined] |
| if len(f.calls) <= 1: # type: ignore[attr-defined] |
| raise CustomError("Don't retry!") |
| return 42 |
| |
| f.calls = [] # type: ignore[attr-defined] |
| |
| retry = Retrying(retry=tenacity.retry_if_exception_type(IOError)) |
| with pytest.raises(CustomError): |
| self.invoke(retry, f) |
| assert f.calls == [1] # type: ignore[attr-defined] |
| |
| def test_retry_error(self) -> None: |
| def f() -> typing.Any: |
| f.calls.append(len(f.calls) + 1) # type: ignore[attr-defined] |
| raise Exception("Retry it!") |
| |
| f.calls = [] # type: ignore[attr-defined] |
| |
| retry = Retrying(stop=tenacity.stop_after_attempt(2)) |
| with pytest.raises(RetryError): |
| self.invoke(retry, f) |
| assert f.calls == [1, 2] # type: ignore[attr-defined] |
| |
| def test_reraise(self) -> None: |
| class CustomError(Exception): |
| pass |
| |
| def f() -> typing.Any: |
| f.calls.append(len(f.calls) + 1) # type: ignore[attr-defined] |
| raise CustomError("Retry it!") |
| |
| f.calls = [] # type: ignore[attr-defined] |
| |
| retry = Retrying(reraise=True, stop=tenacity.stop_after_attempt(2)) |
| with pytest.raises(CustomError): |
| self.invoke(retry, f) |
| assert f.calls == [1, 2] # type: ignore[attr-defined] |
| |
| |
| class TestRetryException(unittest.TestCase): |
| def test_retry_error_is_pickleable(self) -> None: |
| import pickle |
| |
| expected = RetryError(last_attempt=123) # type: ignore[arg-type] |
| pickled = pickle.dumps(expected) |
| actual = pickle.loads(pickled) |
| self.assertEqual(expected.last_attempt, actual.last_attempt) |
| |
| |
| class TestRetryTyping(unittest.TestCase): |
| def test_retry_type_annotations(self) -> None: |
| """The decorator should maintain types of decorated functions.""" |
| |
| def num_to_str(number): |
| # type: (int) -> str |
| return str(number) |
| |
| # equivalent to a raw @retry decoration |
| with_raw = retry(num_to_str) |
| with_raw_result = with_raw(1) |
| |
| # equivalent to a @retry(...) decoration |
| with_constructor = retry()(num_to_str) |
| with_constructor_result = with_raw(1) |
| |
| # These raise TypeError exceptions if they fail |
| check_type(with_raw, typing.Callable[[int], str]) |
| check_type(with_raw_result, str) |
| check_type(with_constructor, typing.Callable[[int], str]) |
| check_type(with_constructor_result, str) |
| |
| |
| class TestMockingSleep: |
| RETRY_ARGS = { |
| "wait": tenacity.wait_fixed(0.1), |
| "stop": tenacity.stop_after_attempt(5), |
| } |
| |
| def _fail(self) -> None: |
| raise NotImplementedError |
| |
| @retry(**RETRY_ARGS) # type: ignore[call-overload, untyped-decorator] |
| def _decorated_fail(self) -> None: |
| self._fail() |
| |
| @pytest.fixture() |
| def mock_sleep( |
| self, monkeypatch: typing.Any |
| ) -> typing.Generator[typing.Any, None, None]: |
| class MockSleep: |
| call_count = 0 |
| |
| def __call__(self, seconds: float) -> None: |
| self.call_count += 1 |
| |
| sleep = MockSleep() |
| monkeypatch.setattr(tenacity.nap.time, "sleep", sleep) # type: ignore[attr-defined] |
| yield sleep |
| |
| def test_decorated(self, mock_sleep: typing.Any) -> None: |
| with pytest.raises(RetryError): |
| self._decorated_fail() |
| assert mock_sleep.call_count == 4 |
| |
| def test_decorated_retry_with(self, mock_sleep: typing.Any) -> None: |
| fail_faster = self._decorated_fail.retry_with( |
| stop=tenacity.stop_after_attempt(2), |
| ) |
| with pytest.raises(RetryError): |
| fail_faster() |
| assert mock_sleep.call_count == 1 |
| |
| |
| class TestPickle(unittest.TestCase): |
| def test_retrying_picklable(self) -> None: |
| """Retrying objects can be pickled for multiprocessing support.""" |
| retrying = Retrying(stop=tenacity.stop_after_attempt(3)) |
| pickled = pickle.dumps(retrying) |
| restored = pickle.loads(pickled) |
| assert isinstance(restored, Retrying) |
| assert isinstance(restored.stop, tenacity.stop_after_attempt) |
| |
| def test_retrying_picklable_after_run(self) -> None: |
| """Retrying objects can be pickled even after being used.""" |
| retrying = Retrying(stop=tenacity.stop_after_attempt(3)) |
| # Access statistics to populate _local |
| _ = retrying.statistics |
| pickled = pickle.dumps(retrying) |
| restored = pickle.loads(pickled) |
| assert isinstance(restored, Retrying) |
| # Statistics should be reset on the restored object |
| assert restored.statistics == {} |
| |
| def test_retry_strategies_picklable(self) -> None: |
| """All built-in retry strategies can be pickled.""" |
| strategies = [ |
| tenacity.retry_if_exception_type(ValueError), |
| tenacity.retry_if_not_exception_type(ValueError), |
| tenacity.retry_if_exception_message(message="fail"), |
| tenacity.retry_if_exception_message(match="fail.*"), |
| tenacity.retry_if_not_exception_message(message="fail"), |
| ] |
| for strategy in strategies: |
| restored = pickle.loads(pickle.dumps(strategy)) |
| assert type(restored) is type(strategy) |
| |
| def test_retrying_pickle_round_trip_works(self) -> None: |
| """A pickled-then-restored Retrying object retries correctly.""" |
| retrying = Retrying( |
| stop=tenacity.stop_after_attempt(3), |
| retry=tenacity.retry_if_exception_type(ValueError), |
| reraise=True, |
| ) |
| restored = pickle.loads(pickle.dumps(retrying)) |
| |
| calls = 0 |
| |
| def succeed_on_third() -> str: |
| nonlocal calls |
| calls += 1 |
| if calls < 3: |
| raise ValueError("not yet") |
| return "ok" |
| |
| result = restored(succeed_on_third) |
| assert result == "ok" |
| assert calls == 3 |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |