Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
retry adds per-attempt timeout, retry/backoff, and optional semaphore-based concurrency control around async functions (including bus handlers).
Signature
def retry(
retry_after: float = 0,
max_attempts: int = 1,
timeout: float | None = None,
retry_on_errors: list[type[Exception] | re.Pattern[str]] | tuple[type[Exception] | re.Pattern[str], ...] | None = None,
retry_backoff_factor: float = 1.0,
semaphore_limit: int | None = None,
semaphore_name: str | Callable[..., str] | None = None,
semaphore_lax: bool = True,
semaphore_scope: Literal['multiprocess', 'global', 'class', 'instance'] = 'global',
semaphore_timeout: float | None = None,
) -> Callable[[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]]
retry({
max_attempts?: number, // default: 1
retry_after?: number, // default: 0 (seconds)
retry_backoff_factor?: number, // default: 1.0
retry_on_errors?: Array<(new (...args) => Error) | RegExp | string>, // default: retry any error
timeout?: number | null, // default: no per-attempt timeout
semaphore_limit?: number | null, // default: no semaphore limit
semaphore_name?: string | ((...args: any[]) => string) | null, // default: function name
semaphore_lax?: boolean, // default: true
semaphore_scope?: 'multiprocess' | 'global' | 'class' | 'instance', // default: 'global'
semaphore_timeout?: number | null, // default: derived when timeout + limit are set
})
Options
| Option | Description |
|---|
max_attempts | Total attempts including the first call (1 disables retries). |
retry_after | Base delay between retries, in seconds. |
retry_backoff_factor | Delay multiplier applied after each failed attempt. |
retry_on_errors | Optional matcher list to restrict which errors are retried. |
timeout | Per-attempt timeout in seconds (None/undefined means no per-attempt timeout). |
semaphore_limit | Max concurrent executions sharing the same semaphore. |
semaphore_name | Semaphore key (string or function deriving a key from call args). |
semaphore_scope | Semaphore sharing scope (multiprocess, global, class, instance). |
semaphore_timeout | Max wait time for semaphore acquisition before timeout/lax fallback. |
semaphore_lax | If true, continue execution without semaphore limit when acquisition times out. |
Example: Inline wrapper
from abxbus import EventBus, BaseEvent
from abxbus.retry import retry
class FetchEvent(BaseEvent[dict]):
url: str
bus = EventBus('AppBus')
async def fetch_with_retry(event: FetchEvent) -> dict:
return await fetch_json(event.url)
bus.on(
FetchEvent,
retry(max_attempts=3, retry_after=1, timeout=5)(fetch_with_retry),
)
import { BaseEvent, EventBus, retry } from 'abxbus'
import { z } from 'zod'
const FetchEvent = BaseEvent.extend('FetchEvent', {
url: z.string(),
event_result_type: z.record(z.string(), z.unknown()),
})
const bus = new EventBus('AppBus')
bus.on(
FetchEvent,
retry({ max_attempts: 3, retry_after: 1, timeout: 5 })(async (event) => {
return await fetchJson(event.url)
})
)
Example: Decorated class method
from abxbus.retry import retry
class ApiService:
@retry(max_attempts=4, retry_after=1, timeout=10, semaphore_limit=2, semaphore_scope='class')
async def get_user(self, user_id: str) -> dict:
return await call_remote_api(user_id)
import { retry } from 'abxbus'
class ApiService {
@retry({ max_attempts: 4, retry_after: 1, timeout: 10, semaphore_limit: 2, semaphore_scope: 'class' })
async getUser(userId: string): Promise<Record<string, unknown>> {
return await callRemoteApi(userId)
}
}
Behavior
- Semaphore acquisition happens once per call, then all retry attempts run within that acquired slot.
- Backoff delay per retry is:
retry_after * retry_backoff_factor^(attempt - 1).
- Retries stop immediately when the thrown error does not match
retry_on_errors.
- Bus/event timeouts act as outer execution budgets;
retry.timeout is per-attempt.
Runtime differences
- Python and TypeScript both support
multiprocess, global, class, and instance.
- TypeScript uses async-context re-entrancy tracking in Node/Bun to avoid same-semaphore nested deadlocks.
retry_on_errors matching differs slightly:
- Python: exception classes or compiled regex patterns (matched against
"ErrorClass: message").
- TypeScript: error constructors, error-name strings, or regex patterns.