Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
EventBus is the central runtime for handler registration, event emit, history lookup, and lifecycle control.
EventBus(...)
Python
TypeScript
Go
Rust
EventBus(
name: str | None = None,
event_concurrency: Literal['global-serial', 'bus-serial', 'parallel'] | str | None = None,
event_handler_concurrency: Literal['serial', 'parallel'] | str = 'serial',
event_handler_completion: Literal['all', 'first'] | str = 'all',
max_history_size: int | None = 100,
max_history_drop: bool = False,
event_timeout: float | None = 60.0,
event_slow_timeout: float | None = 300.0,
event_handler_slow_timeout: float | None = 30.0,
event_handler_detect_file_paths: bool = True,
middlewares: Sequence[EventBusMiddleware] | None = None,
)
new EventBus(name?: string, options?: {
id?: string
max_history_size?: number | null
max_history_drop?: boolean
event_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | null
event_timeout?: number | null
event_slow_timeout?: number | null
event_handler_concurrency?: 'serial' | 'parallel' | null
event_handler_completion?: 'all' | 'first'
event_handler_slow_timeout?: number | null
event_handler_detect_file_paths?: boolean
middlewares?: Array<EventBusMiddleware | new () => EventBusMiddleware>
})
abxbus.NewEventBus(name string, options *abxbus.EventBusOptions)
type EventBusOptions struct {
ID string
MaxHistorySize *int
MaxHistoryDrop bool
EventConcurrency abxbus.EventConcurrencyMode
EventTimeout *float64
EventSlowTimeout *float64
EventHandlerConcurrency abxbus.EventHandlerConcurrencyMode
EventHandlerCompletion abxbus.EventHandlerCompletionMode
EventHandlerSlowTimeout *float64
EventHandlerDetectFilePaths *bool
Middlewares []abxbus.EventBusMiddleware
}
use abxbus_rust::{
event_bus::{EventBus, EventBusOptions},
types::{
EventConcurrencyMode,
EventHandlerCompletionMode,
EventHandlerConcurrencyMode,
},
};
let bus = EventBus::new_with_options(
Some("MyBus".to_string()),
EventBusOptions {
max_history_size: Some(100),
max_history_drop: false,
event_concurrency: EventConcurrencyMode::BusSerial,
event_timeout: Some(60.0),
event_slow_timeout: Some(300.0),
event_handler_concurrency: EventHandlerConcurrencyMode::Serial,
event_handler_completion: EventHandlerCompletionMode::All,
event_handler_slow_timeout: Some(30.0),
event_handler_detect_file_paths: true,
..EventBusOptions::default()
},
);
Shared configuration semantics
| Option | Description |
|---|
name | Human-readable bus name used in logs/labels. |
event_concurrency | Event scheduling policy across queue processing (global-serial, bus-serial, parallel). |
event_handler_concurrency | How handlers for one event execute (serial vs parallel). |
event_handler_completion | Completion mode (all waits for all handlers, first resolves on first successful result). |
event_timeout | Default outer timeout budget for event/handler execution. |
event_slow_timeout | Slow-event warning threshold. |
event_handler_slow_timeout | Slow-handler warning threshold. |
event_handler_detect_file_paths | Whether to capture source path metadata for handlers. |
max_history_size | Maximum retained history (null = unbounded, 0 = keep only in-flight). |
max_history_drop | If true, drop oldest history entries when full; if false, reject new emits at limit. |
middlewares | Ordered middleware instances (or middleware classes/constructors) that receive lifecycle hooks. |
Defaults are resolved at processing time on each bus, not copied onto the event at emit.
When event fields are unset (None/null/zero-value mode), the current processing bus applies its own defaults, so forwarded events can inherit the target bus defaults.
Runtime state
All implementations expose equivalent runtime state:
- Bus identity:
id, name, label
- Registered handlers and indexes
- Event history and pending queue
- In-flight tracking
- Locking/concurrency runtime objects
on(...)
Registers a handler for an event key (EventClass, event type string, or '*').
Python
TypeScript
Go
Rust
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcard_handler)
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcardHandler)
bus.On("UserEvent", "handler", handler, nil)
bus.On("*", "wildcard_handler", wildcardHandler, nil)
timeout := 5.0
bus.On("UserEvent", "custom", handler, &abxbus.EventHandler{
HandlerTimeout: &timeout,
})
use abxbus_rust::{event, event_bus::EventBus, event_handler::EventHandlerOptions};
use serde_json::json;
event! {
struct UserEvent {
user_id: String,
event_result_type: serde_json::Value,
}
}
let bus = EventBus::new(Some("ApiBus".to_string()));
let entry = bus.on(UserEvent, |event: UserEvent| async move {
Ok(json!(event.user_id))
});
bus.on_with_options(
UserEvent,
"custom",
EventHandlerOptions {
handler_timeout: Some(5.0),
..EventHandlerOptions::default()
},
|_event: UserEvent| async move { Ok(serde_json::Value::Null) },
);
off(...)
Unregisters handlers by event key, handler function/reference, or handler id.
Python
TypeScript
Go
Rust
bus.off(UserEvent, handler)
bus.off(UserEvent) # remove all handlers for UserEvent
bus.off('*') # remove all wildcard handlers
bus.off(UserEvent, handler)
bus.off(UserEvent)
bus.off('*')
handler := bus.On("UserEvent", "handler", onUserEvent, nil)
bus.Off("UserEvent", handler)
bus.Off("UserEvent", handler.ID)
bus.Off("*", nil)
use abxbus_rust::typed::EventSpec;
let handler = bus.on(UserEvent, |_event: UserEvent| async move {
Ok(serde_json::Value::Null)
});
bus.off(UserEvent::event_type, Some(&handler.id));
bus.off(UserEvent::event_type, None);
emit(...)
emit(...) enqueues synchronously and returns the pending event immediately.
Python
TypeScript
Go
Rust
event = bus.emit(MyEvent(data='x'))
result = await event.event_result()
const event = bus.emit(MyEvent({ data: 'x' }))
const result = await event.now({ first_result: true }).eventResult()
event := bus.Emit(abxbus.NewBaseEvent("MyEvent", map[string]any{"data": "x"}))
result, err := event.EventResult()
if err != nil {
return err
}
use abxbus_rust::{event, event_bus::EventBus};
use futures::executor::block_on;
event! {
struct MyEvent {
data: String,
event_result_type: serde_json::Value,
}
}
let bus = EventBus::new(Some("EmitBus".to_string()));
let event = bus.emit(MyEvent { data: "x".to_string(), ..Default::default() });
let result = block_on(event.event_result())?;
find(...)
find(...) supports history lookup, optional future waiting, predicate filtering, and parent/child scoping.
Python
TypeScript
Go
Rust
event = await bus.find(ResponseEvent) # history lookup by default
future = await bus.find(ResponseEvent, past=False, future=5)
child = await bus.find(ChildEvent, child_of=parent_event, future=5)
const event = await bus.find(ResponseEvent)
const future = await bus.find(ResponseEvent, { past: false, future: 5 })
const child = await bus.find(ChildEvent, { child_of: parentEvent, future: 5 })
event, err := bus.Find("ResponseEvent", nil, nil)
future, err := bus.Find("ResponseEvent", nil, &abxbus.FindOptions{
Past: false,
Future: 5.0,
})
child, err := bus.Find("ChildEvent", nil, &abxbus.FindOptions{
ChildOf: parentEvent,
Future: 5.0,
})
use abxbus_rust::event_bus::FindOptions;
use futures::executor::block_on;
let event = block_on(bus.find("ResponseEvent", true, None, None));
let future = block_on(bus.find("ResponseEvent", false, Some(5.0), None));
let child = block_on(bus.find_with_options(
"ChildEvent",
FindOptions {
child_of: Some(parent_event.clone()),
future: Some(5.0),
..FindOptions::default()
},
));
filter(...)
filter(...) takes the same arguments as find(...) but returns the list of all matching
events (newest to oldest) instead of just the first match. Accepts an additional limit
argument to cap the result count.
Python
TypeScript
Go
Rust
recent = await bus.filter(ResponseEvent, past=10, future=False, limit=5)
const recent = await bus.filter(ResponseEvent, { past: 10, future: false, limit: 5 })
limit := 5
recent, err := bus.Filter("ResponseEvent", nil, &abxbus.FilterOptions{
Past: 10.0,
Future: false,
Limit: &limit,
})
recentWithPredicate, err := bus.Filter(
"ResponseEvent",
func(event *abxbus.BaseEvent) bool {
return event.Payload["ok"] == true
},
&abxbus.FilterOptions{Past: 10.0, Future: false, Limit: &limit},
)
use abxbus_rust::event_bus::FilterOptions;
use futures::executor::block_on;
use serde_json::json;
use std::collections::HashMap;
let recent = block_on(bus.filter("ResponseEvent", true, None, None, Some(5)));
let filtered = block_on(bus.filter_with_options(
"ResponseEvent",
FilterOptions {
past_window: Some(10.0),
where_filter: Some(HashMap::from([("ok".to_string(), json!(true))])),
limit: Some(5),
..FilterOptions::default()
},
));
Lifecycle helpers
Wait for idle
Python
TypeScript
Go
Rust
await bus.wait_until_idle()
await bus.wait_until_idle(timeout=5)
await bus.waitUntilIdle()
await bus.waitUntilIdle(5)
bus.WaitUntilIdle(nil)
timeout := 5.0
bus.WaitUntilIdle(&timeout)
use futures::executor::block_on;
block_on(bus.wait_until_idle(None));
block_on(bus.wait_until_idle(Some(5.0)));
Parent/child relationship checks
Python
TypeScript
Go
Rust
bus.event_is_child_of(child_event, parent_event)
bus.event_is_parent_of(parent_event, child_event)
bus.eventIsChildOf(childEvent, parentEvent)
bus.eventIsParentOf(parentEvent, childEvent)
bus.EventIsChildOf(childEvent, parentEvent)
bus.EventIsParentOf(parentEvent, childEvent)
bus.event_is_child_of(&child_event, &parent_event);
bus.event_is_parent_of(&parent_event, &child_event);
Execution pipeline
All runtimes expose the same public processing contract:
emit(...) accepts the event, records it in bus history, and queues it for processing.
find(...) can observe accepted events before handlers finish.
- Event-level timeout and slow-warning settings apply to the whole event.
- Handler-level timeout and slow-warning settings apply to each handler.
event_concurrency, event_handler_concurrency, and event_handler_completion choose queueing and completion behavior.
- Handler return values and errors are stored in
event_results; use the result helpers to retrieve typed raw values.
Serialization and teardown
All runtimes can serialize the entire bus state (config, handlers metadata, event history, pending queue), restore it, re-attach handler callables, and continue processing.
Python
TypeScript
Go
Rust
from abxbus import BaseEvent, EventBus
class ResumeTickEvent(BaseEvent[None]):
pass
# 1) Serialize full bus state
print(bus.model_dump_json(indent=2))
# {
# "id": "018f...",
# "name": "SerializableBus",
# "handlers": {
# "018f...": {"id": "018f...", "handler_name": "app.handlers.on_user_created", "event_pattern": "UserCreatedEvent"}
# },
# "event_history": {
# "0190...": {"event_id": "0190...", "event_type": "UserCreatedEvent", "...": "..."}
# },
# "pending_event_queue": ["0190..."]
# }
# 2) Rehydrate state
bus = EventBus.validate(bus.model_dump_json())
# 3) Re-link runtime callables by handler id
bus.handlers.get("018f...").handler = on_user_created
bus.handlers.get("018g...").handler = on_user_deleted
bus.handlers.get("018h...").handler = on_user_updated
# 4) Resume processing (emit starts the runloop and drains restored pending events)
bus.emit(ResumeTickEvent())
await bus.wait_until_idle()
# 5) Teardown when done
await bus.destroy() # destroy immediately and clear by default
await bus.destroy(clear=False) # preserve handlers/history if you need to inspect
import { BaseEvent, EventBus } from 'abxbus'
const ResumeTickEvent = BaseEvent.extend('ResumeTickEvent', {})
// 1) Serialize full bus state
console.log(bus.toJSON())
// {
// id: '018f...',
// name: 'SerializableBus',
// handlers: {
// '018f...': { id: '018f...', handler_name: 'onUserCreated', event_pattern: 'UserCreatedEvent' }
// },
// event_history: {
// '0190...': { event_id: '0190...', event_type: 'UserCreatedEvent', ... }
// },
// pending_event_queue: ['0190...']
// }
// 2) Rehydrate state
bus = EventBus.fromJSON(bus.toJSON())
// 3) Re-link runtime callables by handler id
bus.handlers.get('018f...')!.handler = onUserCreated
bus.handlers.get('018g...')!.handler = onUserDeleted
bus.handlers.get('018h...')!.handler = onUserUpdated
// 4) Resume processing (emit starts the runloop and drains restored pending events)
bus.emit(ResumeTickEvent({}))
await bus.waitUntilIdle()
// 5) Teardown when done
await bus.destroy({ clear: true })
type ResumeTickEvent struct{}
resumeTick := abxbus.MustNewTypedEvent[ResumeTickEvent]("ResumeTickEvent", ResumeTickEvent{})
// 1) Serialize full bus state
payload, err := bus.ToJSON()
fmt.Println(string(payload))
// {
// "id": "018f...",
// "name": "SerializableBus",
// "handlers": {
// "018f...": {"id": "018f...", "handler_name": "onUserCreated", "event_pattern": "UserCreatedEvent"}
// },
// "event_history": {
// "0190...": {"event_id": "0190...", "event_type": "UserCreatedEvent", "...": "..."}
// },
// "pending_event_queue": ["0190..."]
// }
// 2) Rehydrate state
bus, err = abxbus.EventBusFromJSON(payload)
// 3) Re-link runtime callables by handler id
bus.On("UserCreatedEvent", "onUserCreated", onUserCreated, bus.Handlers["018f..."])
bus.On("UserDeletedEvent", "onUserDeleted", onUserDeleted, bus.Handlers["018g..."])
bus.On("UserUpdatedEvent", "onUserUpdated", onUserUpdated, bus.Handlers["018h..."])
// 4) Resume processing
bus.Emit(resumeTick)
bus.WaitUntilIdle(nil)
// 5) Teardown when done
bus.Destroy()
use abxbus_rust::{event, event_bus::EventBus};
use futures::executor::block_on;
use serde_json::json;
event! { struct ResumeTickEvent { event_result_type: serde_json::Value } }
event! { struct UserCreatedEvent { event_result_type: serde_json::Value } }
event! { struct UserDeletedEvent { event_result_type: serde_json::Value } }
event! { struct UserUpdatedEvent { event_result_type: serde_json::Value } }
let resume_tick = ResumeTickEvent { ..Default::default() };
// 1) Serialize full bus state
let payload = bus.to_json_value();
println!("{}", serde_json::to_string_pretty(&payload).unwrap());
// {
// "id": "018f...",
// "name": "SerializableBus",
// "handlers": {
// "018f...": {"id": "018f...", "handler_name": "on_user_created", "event_pattern": "UserCreatedEvent"}
// },
// "event_history": {
// "0190...": {"event_id": "0190...", "event_type": "UserCreatedEvent", "...": "..."}
// },
// "pending_event_queue": ["0190..."]
// }
// 2) Rehydrate state
let bus = EventBus::from_json_value(payload);
// 3) Re-link runtime callables by handler id/name
bus.on(UserCreatedEvent, |_event: UserCreatedEvent| async move { Ok(json!("created")) });
bus.on(UserDeletedEvent, |_event: UserDeletedEvent| async move { Ok(json!("deleted")) });
bus.on(UserUpdatedEvent, |_event: UserUpdatedEvent| async move { Ok(json!("updated")) });
// 4) Resume processing
bus.emit(resume_tick);
assert!(block_on(bus.wait_until_idle(Some(1.0))));
// 5) Teardown when done
bus.destroy();
destroy(...)
Destroy stops runtime work and optionally clears bus-owned state.
Python
TypeScript
Go
Rust
await bus.destroy()
await bus.destroy(clear=False)
await bus.destroy()
await bus.destroy({ clear: false })
bus.Destroy()
bus.DestroyWithOptions(&abxbus.EventBusDestroyOptions{Clear: false})
use abxbus_rust::event_bus::DestroyOptions;
bus.destroy();
bus.destroy_with_options(DestroyOptions { clear: false, ..Default::default() });
clear defaults to true in every runtime.
- With
clear=true, destroy is terminal and clears handlers, history, queues, in-flight tracking, waiters, locks, and dispatch/context state so the bus can be released.
- With
clear=false, destroy is still terminal and the bus cannot be used again; it only preserves handlers/history for inspection.
- Destroying one bus only clears that bus’s local state; shared event objects, handler functions, and other buses are not destroyed.
Timeout and precedence
Shared precedence model:
- Handler override
- Event override
- Bus default
Effective handler timeout is capped by event timeout when both are set.