| # Copyright 2016 Étienne Bersac |
| # Copyright 2016 Julien Danjou |
| # Copyright 2016 Joshua Harlow |
| # Copyright 2013-2014 Ray Holder |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import functools |
| import sys |
| import typing as t |
| from asyncio import sleep |
| |
| from tenacity import AttemptManager |
| from tenacity import BaseRetrying |
| from tenacity import DoAttempt |
| from tenacity import DoSleep |
| from tenacity import RetryCallState |
| |
| WrappedFnReturnT = t.TypeVar("WrappedFnReturnT") |
| WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]]) |
| |
| |
| class AsyncRetrying(BaseRetrying): |
| sleep: t.Callable[[float], t.Awaitable[t.Any]] |
| |
| def __init__(self, sleep: t.Callable[[float], t.Awaitable[t.Any]] = sleep, **kwargs: t.Any) -> None: |
| super().__init__(**kwargs) |
| self.sleep = sleep |
| |
| async def __call__( # type: ignore[override] |
| self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any |
| ) -> WrappedFnReturnT: |
| self.begin() |
| |
| retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) |
| while True: |
| do = self.iter(retry_state=retry_state) |
| if isinstance(do, DoAttempt): |
| try: |
| result = await fn(*args, **kwargs) |
| except BaseException: # noqa: B902 |
| retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type] |
| else: |
| retry_state.set_result(result) |
| elif isinstance(do, DoSleep): |
| retry_state.prepare_for_next_attempt() |
| await self.sleep(do) |
| else: |
| return do # type: ignore[no-any-return] |
| |
| def __iter__(self) -> t.Generator[AttemptManager, None, None]: |
| raise TypeError("AsyncRetrying object is not iterable") |
| |
| def __aiter__(self) -> "AsyncRetrying": |
| self.begin() |
| self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) |
| return self |
| |
| async def __anext__(self) -> AttemptManager: |
| while True: |
| do = self.iter(retry_state=self._retry_state) |
| if do is None: |
| raise StopAsyncIteration |
| elif isinstance(do, DoAttempt): |
| return AttemptManager(retry_state=self._retry_state) |
| elif isinstance(do, DoSleep): |
| self._retry_state.prepare_for_next_attempt() |
| await self.sleep(do) |
| else: |
| raise StopAsyncIteration |
| |
| def wraps(self, fn: WrappedFn) -> WrappedFn: |
| fn = super().wraps(fn) |
| # Ensure wrapper is recognized as a coroutine function. |
| |
| @functools.wraps(fn) |
| async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any: |
| return await fn(*args, **kwargs) |
| |
| # Preserve attributes |
| async_wrapped.retry = fn.retry # type: ignore[attr-defined] |
| async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined] |
| |
| return async_wrapped # type: ignore[return-value] |