Skip to main content
EventBusMiddleware defines the middleware hook contract used by EventBus in both runtimes.

Interface

from typing import Any
from abxbus import BaseEvent, EventBus, EventHandler, EventResult, EventStatus

class EventBusMiddleware:
    async def on_event_change(self, eventbus: EventBus, event: BaseEvent[Any], status: EventStatus) -> None:
        ...

    async def on_event_result_change(
        self,
        eventbus: EventBus,
        event: BaseEvent[Any],
        event_result: EventResult[Any],
        status: EventStatus,
    ) -> None:
        ...

    async def on_bus_handlers_change(self, eventbus: EventBus, handler: EventHandler, registered: bool) -> None:
        ...

Setup with EventBus

from abxbus import EventBus
from abxbus.middlewares import EventBusMiddleware

class AnalyticsMiddleware(EventBusMiddleware):
    pass

bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])

Lifecycle behavior

  • on_event_change / onEventChange runs on event lifecycle transitions.
  • on_event_result_change / onEventResultChange runs on handler-result lifecycle transitions.
  • on_bus_handlers_change / onBusHandlersChange runs when handlers are added/removed.
  • Hook status values are only pending, started, and completed.
  • Handler failures are represented on event_result.status == 'error' and event_result.error when the hook status is completed.

Custom middleware example

from abxbus import EventBus, EventStatus
from abxbus.middlewares import EventBusMiddleware

class AnalyticsMiddleware(EventBusMiddleware):
    async def on_event_result_change(self, eventbus, event, event_result, status):
        if status != EventStatus.COMPLETED:
            return
        if event_result.status == 'error':
            return
        print(event.event_type, event_result.handler_name)

    async def on_bus_handlers_change(self, eventbus, handler, registered):
        action = 'registered' if registered else 'unregistered'
        print(eventbus.label, handler.id, action)

bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])

Built-in implementations

  • OtelTracingMiddleware
  • AutoErrorEventMiddleware
  • AutoReturnEventMiddleware
  • AutoHandlerChangeEventMiddleware
  • WALEventBusMiddleware
  • LoggerEventBusMiddleware
  • SQLiteHistoryMirrorMiddleware