Skip to main content

Asyncio retry utility for Python 3.7+

Project description

aioretry

Asyncio retry utility for Python 3.7+

Install

$ pip install aioretry

Usage

import asyncio
from typing import (
  Tuple
)

from aioretry import (
    retry,
    # Tuple[bool, Union[int, float]]
    RetryPolicyStrategy,
    RetryInfo
)

# This example shows the usage with python typings
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
    """
    - It will always retry until succeeded
    - If fails for the first time, it will retry immediately,
    - If it fails again,
      aioretry will perform a 100ms delay before the second retry,
      200ms delay before the 3rd retry,
      the 4th retry immediately,
      100ms delay before the 5th retry,
      etc...
    """
    return False, (info.fails - 1) % 3 * 0.1


@retry(retry_policy)
async def connect_to_server():
    # connec to server
    ...

asyncio.run(connect_to_server())

Use as class instance method decorator

We could also use retry as a decorator for instance method

class Client:
    @retry(retry_policy)
    async def connect(self):
        await self._connect()

asyncio.run(Client().connect())

Use instance method as retry policy

retry_policy could be the method name of the class if retry is used as a decorator for instance method.

class ClientWithConfigurableRetryPolicy(Client):
    def __init__(self, max_retries: int = 3):
        self._max_retries = max_retries

    def _retry_policy(self, info: RetryInfo) -> RetryPolicyStrategy:
        return info.fails > self._max_retries, info.fails * 0.1

    # Then aioretry will use `self._retry_policy` as the retry policy.
    # And by using a str as the parameter `retry_policy`,
    # the decorator must be used for instance methods
    @retry('_retry_policy')
    async def connect(self):
        await self._connect()

asyncio.run(ClientWithConfigurableRetryPolicy(10).connect())

Register an before_retry callback

We could also register an before_retry callback which will be executed after every failure of the target function if the corresponding retry is not abandoned.

class ClientTrackableFailures(ClientWithConfigurableRetryPolicy):
    # `before_retry` could either be a sync function or an async function
    async def _before_retry(self, info: RetryInfo) -> None:
        await self._send_failure_log(info.exception, info.fails)

    @retry(
      retry_policy='_retry_policy',

      # Similar to `retry_policy`,
      # `before_retry` could either be a Callable or a str
      before_retry='_before_retry'
    )
    async def connect(self):
        await self._connect()

Only retry for certain types of exceptions

def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
    if isinstance(info.exception, (KeyError, ValueError)):
        # If it raises a KeyError or a ValueError, it will not retry.
        return True, 0

    # Otherwise, retry immediately
    return False, 0

@retry(retry_policy)
async def foo():
    # do something that might raise KeyError, ValueError or RuntimeError
    ...

APIs

retry(retry_policy, before_retry)(fn)

  • fn Callable[[...], Awaitable] the function to be wrapped. The function should be an async function or normal function returns an awaitable.
  • retry_policy Union[str, RetryPolicy]
  • before_retry? Optional[Union[str, Callable[[RetryInfo], Optional[Awaitable]]]] If specified, before_retry is called after each failure of fn and before the corresponding retry. If the retry is abandoned, before_retry will not be executed.

Returns a wrapped function which accepts the same arguments as fn and returns an Awaitable.

RetryPolicy

RetryPolicy = Callable[[RetryInfo], Tuple[bool, Union[float, int]]]

Retry policy is used to determine what to do next after the fn fails to do some certain thing.

abandon, delay = retry_policy(info)
  • info RetryInfo
    • info.fails int is the counter number of how many times function fn performs as a failure. If fn fails for the first time, then fails will be 1.
    • info.exception Exception is the exception that fn raised.
    • info.since datetime is the datetime when the first failure happens.
  • If abandon is True, then aioretry will give up the retry and raise the exception directly, otherwise aioretry will sleep delay seconds (asyncio.sleep(delay)) before the next retry.
def retry_policy(info: RetryInfo):
    if isinstance(info.exception, KeyError):
        # Just raise exceptions of type KeyError
        return True, 0

    return False, info.fails * 0.1

Python typings

from aioretry import (
    # The type of retry_policy function
    RetryPolicy,
    # The type of the return value of retry_policy function
    RetryPolicyStrategy,
    # The type of before_retry function
    BeforeRetry,
    RetryInfo
)

Upgrade guide

Since 5.0.0, aioretry introduces RetryInfo as the only parameter of retry_policy or before_retry

2.x -> 5.x

2.x

def retry_policy(fails: int):
    """A policy that gives no chances to retry
    """

    return True, 0.1 * fails

5.x

def retry_policy(info: RetryInfo):
    return True, 0.1 * info.fails

3.x -> 5.x

3.x

def before_retry(e: Exception, fails: int):
    ...

5.x

# Change the sequence of the parameters
def before_retry(info: RetryInfo):
    info.exception
    info.fails
    ...

4.x -> 5.x

Since 5.0.0, both retry_policy and before_retry have only one parameter of type RetryInfo respectively.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioretry-5.0.2.tar.gz (6.4 kB view hashes)

Uploaded Source

Built Distribution

aioretry-5.0.2-py3-none-any.whl (5.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page