Skip to main content
EventBusMiddleware defines the middleware hook contract used by EventBus in both runtimes. Rust core event dispatch does not expose a public middleware trait yet. Use handler wrappers or inspect EventBus::to_json_value() / event result state directly until Rust middleware parity lands.

Interface

from typing import Any
from abxbus import BaseEvent, EventBus, EventHandler, EventResult, EventStatus

class EventBusMiddleware:
    async def on_event_change(self, eventbus: EventBus, event: BaseEvent[Any], status: EventStatus) -> None:
        ...

    async def on_event_result_change(
        self,
        eventbus: EventBus,
        event: BaseEvent[Any],
        event_result: EventResult[Any],
        status: EventStatus,
    ) -> None:
        ...

    async def on_bus_handlers_change(self, eventbus: EventBus, handler: EventHandler, registered: bool) -> None:
        ...

Setup with EventBus

from abxbus import EventBus
from abxbus.middlewares import EventBusMiddleware

class AnalyticsMiddleware(EventBusMiddleware):
    pass

bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])

Lifecycle behavior

  • on_event_change / onEventChange runs on event lifecycle transitions.
  • on_event_result_change / onEventResultChange runs on handler-result lifecycle transitions.
  • on_bus_handlers_change / onBusHandlersChange runs when handlers are added/removed.
  • Hook status values are only pending, started, and completed.
  • Handler failures are represented on event_result.status == 'error' and event_result.error when the hook status is completed.

Custom middleware example

from abxbus import EventBus, EventStatus
from abxbus.middlewares import EventBusMiddleware

class AnalyticsMiddleware(EventBusMiddleware):
    async def on_event_result_change(self, eventbus, event, event_result, status):
        if status != EventStatus.COMPLETED:
            return
        if event_result.status == 'error':
            return
        print(event.event_type, event_result.handler_name)

    async def on_bus_handlers_change(self, eventbus, handler, registered):
        action = 'registered' if registered else 'unregistered'
        print(eventbus.label, handler.id, action)

bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])

Built-in implementations

  • OtelTracingMiddleware
  • AutoErrorEventMiddleware
  • AutoReturnEventMiddleware
  • AutoHandlerChangeEventMiddleware
  • WALEventBusMiddleware
  • LoggerEventBusMiddleware
  • SQLiteHistoryMirrorMiddleware