Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
EventBus is the central runtime for handler registration, event emit, history lookup, and lifecycle control.
EventBus(...)
EventBus(
name: str | None = None,
event_concurrency: Literal['global-serial', 'bus-serial', 'parallel'] | str | None = None,
event_handler_concurrency: Literal['serial', 'parallel'] | str = 'serial',
event_handler_completion: Literal['all', 'first'] | str = 'all',
max_history_size: int | None = 100,
max_history_drop: bool = False,
event_timeout: float | None = 60.0,
event_slow_timeout: float | None = 300.0,
event_handler_slow_timeout: float | None = 30.0,
event_handler_detect_file_paths: bool = True,
middlewares: Sequence[EventBusMiddleware] | None = None,
)
new EventBus(name?: string, options?: {
id?: string
max_history_size?: number | null
max_history_drop?: boolean
event_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | null
event_timeout?: number | null
event_slow_timeout?: number | null
event_handler_concurrency?: 'serial' | 'parallel' | null
event_handler_completion?: 'all' | 'first'
event_handler_slow_timeout?: number | null
event_handler_detect_file_paths?: boolean
middlewares?: Array<EventBusMiddleware | new () => EventBusMiddleware>
})
Shared configuration semantics
| Option | Description |
|---|
name | Human-readable bus name used in logs/labels. |
event_concurrency | Event scheduling policy across queue processing (global-serial, bus-serial, parallel). |
event_handler_concurrency | How handlers for one event execute (serial vs parallel). |
event_handler_completion | Completion mode (all waits for all handlers, first resolves on first successful result). |
event_timeout | Default outer timeout budget for event/handler execution. |
event_slow_timeout | Slow-event warning threshold. |
event_handler_slow_timeout | Slow-handler warning threshold. |
event_handler_detect_file_paths | Whether to capture source path metadata for handlers. |
max_history_size | Maximum retained history (null = unbounded, 0 = keep only in-flight). |
max_history_drop | If true, drop oldest history entries when full; if false, reject new emits at limit. |
middlewares | Ordered middleware instances (or middleware classes/constructors) that receive lifecycle hooks. |
Defaults are resolved at processing time on each bus, not copied onto the event at emit.
When event fields are unset (None/null), the current processing bus applies its own defaults.
Runtime state
Both implementations expose equivalent runtime state:
- Bus identity:
id, name, label
- Registered handlers and indexes
- Event history and pending queue
- In-flight tracking
- Locking/concurrency runtime objects
on(...)
Registers a handler for an event key (EventClass, event type string, or '*').
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcard_handler)
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcardHandler)
off(...)
Unregisters handlers by event key, handler function/reference, or handler id.
bus.off(UserEvent, handler)
bus.off(UserEvent) # remove all handlers for UserEvent
bus.off('*') # remove all wildcard handlers
bus.off(UserEvent, handler)
bus.off(UserEvent)
bus.off('*')
emit(...)
emit(...) enqueues synchronously and returns the pending event immediately.
event = bus.emit(MyEvent(data='x'))
result = await event.event_result()
const event = bus.emit(MyEvent({ data: 'x' }))
const result = await event.first()
find(...)
find(...) supports history lookup, optional future waiting, predicate filtering, and parent/child scoping.
event = await bus.find(ResponseEvent) # history lookup by default
future = await bus.find(ResponseEvent, past=False, future=5)
child = await bus.find(ChildEvent, child_of=parent_event, future=5)
const event = await bus.find(ResponseEvent)
const future = await bus.find(ResponseEvent, { past: false, future: 5 })
const child = await bus.find(ChildEvent, { child_of: parentEvent, future: 5 })
Lifecycle helpers
Wait for idle
await bus.wait_until_idle()
await bus.wait_until_idle(timeout=5)
await bus.waitUntilIdle()
await bus.waitUntilIdle(5)
Parent/child relationship checks
bus.event_is_child_of(child_event, parent_event)
bus.event_is_parent_of(parent_event, child_event)
bus.eventIsChildOf(childEvent, parentEvent)
bus.eventIsParentOf(parentEvent, childEvent)
Execution pipeline
Both runtimes use the same layered model, expressed with runtime-native wrappers.
- Event scope lock
- Event-level timeout and slow monitor
- Per-handler lock
- Handler-level timeout and slow monitor
- Handler execution context scope
- Error normalization and completion callbacks
# EventBus.step(...)
async with self.locks._run_with_event_lock(self, event):
await self._process_event(event, timeout=timeout)
# EventBus.process_event(...)
async with asyncio.timeout(resolved_event_timeout):
async with with_slow_monitor(self._create_slow_event_warning_timer(event)):
await event._run_handlers(
eventbus=self,
handlers=self.get_handlers_for_event(event),
timeout=resolved_event_timeout,
)
# EventResult.run_handler(...)
async with eventbus.locks._run_with_handler_lock(eventbus, event, event_result):
with eventbus._run_with_handler_dispatch_context(event, event_result.handler_id):
async with event_result._run_with_timeout(event):
async with with_slow_monitor(handler_slow_monitor):
await event_result._call_handler(event, handler, dispatch_context)
// EventBus.processEvent(...)
await this.locks._runWithEventLock(
event,
() =>
this._runHandlersWithTimeout(event, pending_entries, resolved_event_timeout, () =>
_runWithSlowMonitor(event._createSlowEventWarningTimer(), () => scoped_event._runHandlers(pending_entries))
),
options
)
// BaseEvent._runHandlers(...)
await this.event_bus.locks._runWithHandlerLock(original, this.event_bus.event_handler_concurrency, async (handler_lock) => {
await entry.runHandler(handler_lock)
})
// EventResult.runHandler(...)
await this.event_bus.locks._runWithHandlerDispatchContext(this, async () => {
await _runWithAsyncContext(event._getDispatchContext() ?? null, async () => {
const handler_result = await _runWithTimeout(
this.handler_timeout,
() => this._createHandlerTimeoutError(event),
() =>
_runWithSlowMonitor(slow_handler_warning_timer, () =>
_runWithAbortMonitor(() => this.handler.handler(handler_event), abort_signal)
)
)
this._finalizeHandlerResult(event, handler_result)
})
})
Serialization and teardown
Both runtimes can serialize the entire bus state (config, handlers metadata, event history, pending queue), restore it, re-attach handler callables, and continue processing.
from abxbus import BaseEvent, EventBus
class ResumeTickEvent(BaseEvent[None]):
pass
# 1) Serialize full bus state
print(bus.model_dump_json(indent=2))
# {
# "id": "018f...",
# "name": "SerializableBus",
# "handlers": {
# "018f...": {"id": "018f...", "handler_name": "app.handlers.on_user_created", "event_pattern": "UserCreatedEvent"}
# },
# "event_history": {
# "0190...": {"event_id": "0190...", "event_type": "UserCreatedEvent", "...": "..."}
# },
# "pending_event_queue": ["0190..."]
# }
# 2) Rehydrate state
bus = EventBus.validate(bus.model_dump_json())
# 3) Re-link runtime callables by handler id
bus.handlers.get("018f...").handler = on_user_created
bus.handlers.get("018g...").handler = on_user_deleted
bus.handlers.get("018h...").handler = on_user_updated
# 4) Resume processing (emit starts the runloop and drains restored pending events)
bus.emit(ResumeTickEvent())
await bus.wait_until_idle()
# 5) Teardown when done
await bus.stop(timeout=1.0)
await bus.stop(clear=True)
import { BaseEvent, EventBus } from 'abxbus'
const ResumeTickEvent = BaseEvent.extend('ResumeTickEvent', {})
// 1) Serialize full bus state
console.log(bus.toJSON())
// {
// id: '018f...',
// name: 'SerializableBus',
// handlers: {
// '018f...': { id: '018f...', handler_name: 'onUserCreated', event_pattern: 'UserCreatedEvent' }
// },
// event_history: {
// '0190...': { event_id: '0190...', event_type: 'UserCreatedEvent', ... }
// },
// pending_event_queue: ['0190...']
// }
// 2) Rehydrate state
bus = EventBus.fromJSON(bus.toJSON())
// 3) Re-link runtime callables by handler id
bus.handlers.get('018f...')!.handler = onUserCreated
bus.handlers.get('018g...')!.handler = onUserDeleted
bus.handlers.get('018h...')!.handler = onUserUpdated
// 4) Resume processing (emit starts the runloop and drains restored pending events)
bus.emit(ResumeTickEvent({}))
await bus.waitUntilIdle()
// 5) Teardown when done
bus.destroy()
Timeout and precedence
Shared precedence model:
- Handler override
- Event override
- Bus default
Effective handler timeout is capped by event timeout when both are set.