Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
EventBusMiddleware defines the middleware hook contract used by EventBus in both runtimes.
Rust core event dispatch does not expose a public middleware trait yet. Use handler wrappers or inspect EventBus::to_json_value() / event result state directly until Rust middleware parity lands.
Interface
Python
TypeScript
Go
Rust
from typing import Any
from abxbus import BaseEvent, EventBus, EventHandler, EventResult, EventStatus
class EventBusMiddleware:
async def on_event_change(self, eventbus: EventBus, event: BaseEvent[Any], status: EventStatus) -> None:
...
async def on_event_result_change(
self,
eventbus: EventBus,
event: BaseEvent[Any],
event_result: EventResult[Any],
status: EventStatus,
) -> None:
...
async def on_bus_handlers_change(self, eventbus: EventBus, handler: EventHandler, registered: bool) -> None:
...
import type { BaseEvent, EventBus, EventHandler, EventResult, EventStatus } from 'abxbus'
export interface EventBusMiddleware {
onEventChange?(eventbus: EventBus, event: BaseEvent, status: EventStatus): void | Promise<void>
onEventResultChange?(
eventbus: EventBus,
event: BaseEvent,
event_result: EventResult,
status: EventStatus
): void | Promise<void>
onBusHandlersChange?(eventbus: EventBus, handler: EventHandler, registered: boolean): void | Promise<void>
}
type EventBusMiddleware interface {
OnEventChange(eventbus *abxbus.EventBus, event *abxbus.BaseEvent, status string)
OnEventResultChange(eventbus *abxbus.EventBus, event *abxbus.BaseEvent, eventResult *abxbus.EventResult, status string)
OnBusHandlersChange(eventbus *abxbus.EventBus, handler *abxbus.EventHandler, registered bool)
}
type AnalyticsMiddleware struct {
abxbus.EventBusMiddlewareBase
}
// Not implemented in abxbus-rust yet.
//
// Use explicit handler wrappers, normal Rust context propagation, and
// EventBus::to_json_value() for current Rust observability/custom hooks.
Setup with EventBus
Python
TypeScript
Go
Rust
from abxbus import EventBus
from abxbus.middlewares import EventBusMiddleware
class AnalyticsMiddleware(EventBusMiddleware):
pass
bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])
import { EventBus } from 'abxbus'
import type { EventBusMiddleware } from 'abxbus/middlewares'
class AnalyticsMiddleware implements EventBusMiddleware {}
const bus = new EventBus('AppBus', { middlewares: [AnalyticsMiddleware] })
type AnalyticsMiddleware struct {
abxbus.EventBusMiddlewareBase
}
bus := abxbus.NewEventBus("AppBus", &abxbus.EventBusOptions{
Middlewares: []abxbus.EventBusMiddleware{&AnalyticsMiddleware{}},
})
// Not implemented in abxbus-rust yet.
Lifecycle behavior
on_event_change / onEventChange runs on event lifecycle transitions.
on_event_result_change / onEventResultChange runs on handler-result lifecycle transitions.
on_bus_handlers_change / onBusHandlersChange runs when handlers are added/removed.
- Hook
status values are only pending, started, and completed.
- Handler failures are represented on
event_result.status == 'error' and event_result.error when the hook status is completed.
Custom middleware example
Python
TypeScript
Go
Rust
from abxbus import EventBus, EventStatus
from abxbus.middlewares import EventBusMiddleware
class AnalyticsMiddleware(EventBusMiddleware):
async def on_event_result_change(self, eventbus, event, event_result, status):
if status != EventStatus.COMPLETED:
return
if event_result.status == 'error':
return
print(event.event_type, event_result.handler_name)
async def on_bus_handlers_change(self, eventbus, handler, registered):
action = 'registered' if registered else 'unregistered'
print(eventbus.label, handler.id, action)
bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware])
import {
BaseEvent,
EventBus,
type EventHandler,
type EventResult,
type EventStatus,
} from 'abxbus'
import type { EventBusMiddleware } from 'abxbus/middlewares'
class AnalyticsMiddleware implements EventBusMiddleware {
async onEventResultChange(
eventbus: EventBus,
event: BaseEvent,
event_result: EventResult,
status: EventStatus
): Promise<void> {
if (status !== 'completed') {
return
}
if (event_result.status === 'error') {
return
}
console.log(event.event_type, event_result.handler_name)
}
async onBusHandlersChange(eventbus: EventBus, handler: EventHandler, registered: boolean): Promise<void> {
const action = registered ? 'registered' : 'unregistered'
console.log(eventbus.label, handler.id, action)
}
}
const bus = new EventBus('AppBus', { middlewares: [AnalyticsMiddleware] })
type AnalyticsMiddleware struct {
abxbus.EventBusMiddlewareBase
}
func (m *AnalyticsMiddleware) OnEventResultChange(
eventbus *abxbus.EventBus,
event *abxbus.BaseEvent,
eventResult *abxbus.EventResult,
status string,
) {
if status != "completed" || eventResult.Status == abxbus.EventResultError {
return
}
fmt.Println(event.EventType, eventResult.HandlerName)
}
func (m *AnalyticsMiddleware) OnBusHandlersChange(eventbus *abxbus.EventBus, handler *abxbus.EventHandler, registered bool) {
action := "unregistered"
if registered {
action = "registered"
}
fmt.Println(eventbus.Label(), handler.ID, action)
}
bus := abxbus.NewEventBus("AppBus", &abxbus.EventBusOptions{
Middlewares: []abxbus.EventBusMiddleware{&AnalyticsMiddleware{}},
})
// Not implemented in abxbus-rust yet.
Built-in implementations
Python
TypeScript
Go
Rust
OtelTracingMiddleware
AutoErrorEventMiddleware
AutoReturnEventMiddleware
AutoHandlerChangeEventMiddleware
WALEventBusMiddleware
LoggerEventBusMiddleware
SQLiteHistoryMirrorMiddleware
OtelTracingMiddleware from abxbus/OtelTracingMiddleware
OtelTracingMiddleware via abxbus.NewOtelTracingMiddleware(...)
Go intentionally only ships Otel middleware for now; retry/events_suck and the other middleware implementations are deferred.
- Rust middleware implementations are not exposed yet.