Skip to main content
EventBus is the central runtime for handler registration, event emit, history lookup, and lifecycle control.

EventBus(...)

EventBus(
    name: str | None = None,
    event_concurrency: Literal['global-serial', 'bus-serial', 'parallel'] | str | None = None,
    event_handler_concurrency: Literal['serial', 'parallel'] | str = 'serial',
    event_handler_completion: Literal['all', 'first'] | str = 'all',
    max_history_size: int | None = 100,
    max_history_drop: bool = False,
    event_timeout: float | None = 60.0,
    event_slow_timeout: float | None = 300.0,
    event_handler_slow_timeout: float | None = 30.0,
    event_handler_detect_file_paths: bool = True,
    middlewares: Sequence[EventBusMiddleware] | None = None,
)

Shared configuration semantics

OptionDescription
nameHuman-readable bus name used in logs/labels.
event_concurrencyEvent scheduling policy across queue processing (global-serial, bus-serial, parallel).
event_handler_concurrencyHow handlers for one event execute (serial vs parallel).
event_handler_completionCompletion mode (all waits for all handlers, first resolves on first successful result).
event_timeoutDefault outer timeout budget for event/handler execution.
event_slow_timeoutSlow-event warning threshold.
event_handler_slow_timeoutSlow-handler warning threshold.
event_handler_detect_file_pathsWhether to capture source path metadata for handlers.
max_history_sizeMaximum retained history (null = unbounded, 0 = keep only in-flight).
max_history_dropIf true, drop oldest history entries when full; if false, reject new emits at limit.
middlewaresOrdered middleware instances (or middleware classes/constructors) that receive lifecycle hooks.
Defaults are resolved at processing time on each bus, not copied onto the event at emit. When event fields are unset (None/null), the current processing bus applies its own defaults.

Runtime state

Both implementations expose equivalent runtime state:
  • Bus identity: id, name, label
  • Registered handlers and indexes
  • Event history and pending queue
  • In-flight tracking
  • Locking/concurrency runtime objects

on(...)

Registers a handler for an event key (EventClass, event type string, or '*').
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcard_handler)

off(...)

Unregisters handlers by event key, handler function/reference, or handler id.
bus.off(UserEvent, handler)
bus.off(UserEvent)   # remove all handlers for UserEvent
bus.off('*')         # remove all wildcard handlers

emit(...)

emit(...) enqueues synchronously and returns the pending event immediately.
event = bus.emit(MyEvent(data='x'))
result = await event.event_result()

find(...)

find(...) supports history lookup, optional future waiting, predicate filtering, and parent/child scoping.
event = await bus.find(ResponseEvent)  # history lookup by default
future = await bus.find(ResponseEvent, past=False, future=5)
child = await bus.find(ChildEvent, child_of=parent_event, future=5)

Lifecycle helpers

Wait for idle

await bus.wait_until_idle()
await bus.wait_until_idle(timeout=5)

Parent/child relationship checks

bus.event_is_child_of(child_event, parent_event)
bus.event_is_parent_of(parent_event, child_event)

Execution pipeline

Both runtimes use the same layered model, expressed with runtime-native wrappers.
  • Event scope lock
  • Event-level timeout and slow monitor
  • Per-handler lock
  • Handler-level timeout and slow monitor
  • Handler execution context scope
  • Error normalization and completion callbacks
# EventBus.step(...)
async with self.locks._run_with_event_lock(self, event):
    await self._process_event(event, timeout=timeout)

# EventBus.process_event(...)
async with asyncio.timeout(resolved_event_timeout):
    async with with_slow_monitor(self._create_slow_event_warning_timer(event)):
        await event._run_handlers(
            eventbus=self,
            handlers=self.get_handlers_for_event(event),
            timeout=resolved_event_timeout,
        )

# EventResult.run_handler(...)
async with eventbus.locks._run_with_handler_lock(eventbus, event, event_result):
    with eventbus._run_with_handler_dispatch_context(event, event_result.handler_id):
        async with event_result._run_with_timeout(event):
            async with with_slow_monitor(handler_slow_monitor):
                await event_result._call_handler(event, handler, dispatch_context)

Serialization and teardown

Both runtimes can serialize the entire bus state (config, handlers metadata, event history, pending queue), restore it, re-attach handler callables, and continue processing.
from abxbus import BaseEvent, EventBus

class ResumeTickEvent(BaseEvent[None]):
    pass

# 1) Serialize full bus state
print(bus.model_dump_json(indent=2))
# {
#   "id": "018f...",
#   "name": "SerializableBus",
#   "handlers": {
#     "018f...": {"id": "018f...", "handler_name": "app.handlers.on_user_created", "event_pattern": "UserCreatedEvent"}
#   },
#   "event_history": {
#     "0190...": {"event_id": "0190...", "event_type": "UserCreatedEvent", "...": "..."}
#   },
#   "pending_event_queue": ["0190..."]
# }

# 2) Rehydrate state
bus = EventBus.validate(bus.model_dump_json())

# 3) Re-link runtime callables by handler id
bus.handlers.get("018f...").handler = on_user_created
bus.handlers.get("018g...").handler = on_user_deleted
bus.handlers.get("018h...").handler = on_user_updated

# 4) Resume processing (emit starts the runloop and drains restored pending events)
bus.emit(ResumeTickEvent())
await bus.wait_until_idle()

# 5) Teardown when done
await bus.stop(timeout=1.0)
await bus.stop(clear=True)

Timeout and precedence

Shared precedence model:
  1. Handler override
  2. Event override
  3. Bus default
Effective handler timeout is capped by event timeout when both are set.