Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
Each runtime exposes two related (but different) runtime stores:
pending_event_queue: events accepted by the bus but not yet started by the runloop
event_history: events the bus knows about (pending, started, and completed until trimmed)
What each store is for
| Store | Purpose | Typical contents |
|---|
pending_event_queue | Scheduling buffer | events waiting their turn to start |
event_history | Observability + lookup | recent pending/started/completed events, bounded by history settings |
The key difference: queue is “what still needs to start”, history is “what this bus has seen”.
Retention config options
| Option | Meaning |
|---|
max_history_size | Max number of events retained in event_history (null/None means unbounded, 0 means keep only in-flight visibility). |
max_history_drop | If true, accept new events and trim oldest history entries when over limit. If false, reject new events at the limit (for max_history_size > 0). |
Event lifecycle: queue -> history -> trim
- Emit:
- Event is accepted.
- Event is added to
event_history.
- Event is enqueued into
pending_event_queue.
- Runloop begins processing:
- Event is removed from
pending_event_queue.
- Event stays in
event_history while handlers run.
- Completion:
- Event is marked completed.
- Event may remain in
event_history or be dropped based on retention settings.
- Trimming:
max_history_size and max_history_drop determine whether old history is removed or new emits are rejected.
Trimming behavior by mode
max_history_size = None/null: no automatic history limit.
max_history_size = 0: completed events are removed immediately; only pending/in-flight visibility remains.
max_history_size > 0 and max_history_drop = false: bus rejects new emits once history reaches the limit.
max_history_size > 0 and max_history_drop = true: bus trims oldest history entries (prefers completed first; can drop uncompleted entries under extreme pressure).
All runtimes follow this policy. Internally, trim timing is implementation-specific (eager vs amortized cleanup), but externally the semantics above are the contract to rely on.
Common configurations
Python
TypeScript
Go
Rust
from abxbus import EventBus
bounded_drop = EventBus(max_history_size=100, max_history_drop=True)
bounded_reject = EventBus(max_history_size=100, max_history_drop=False)
unbounded = EventBus(max_history_size=None)
in_flight_only = EventBus(max_history_size=0)
import { EventBus } from 'abxbus'
const boundedDrop = new EventBus('BoundedDropBus', { max_history_size: 100, max_history_drop: true })
const boundedReject = new EventBus('BoundedRejectBus', { max_history_size: 100, max_history_drop: false })
const unbounded = new EventBus('UnboundedBus', { max_history_size: null })
const inFlightOnly = new EventBus('InFlightBus', { max_history_size: 0 })
maxHistory := 100
zeroHistory := 0
unlimitedHistorySize := abxbus.UnlimitedHistorySize
boundedDrop := abxbus.NewEventBus("BoundedDropBus", &abxbus.EventBusOptions{
MaxHistorySize: &maxHistory,
MaxHistoryDrop: true,
})
boundedReject := abxbus.NewEventBus("BoundedRejectBus", &abxbus.EventBusOptions{
MaxHistorySize: &maxHistory,
MaxHistoryDrop: false,
})
unbounded := abxbus.NewEventBus("UnboundedBus", &abxbus.EventBusOptions{
MaxHistorySize: &unlimitedHistorySize,
})
inFlightOnly := abxbus.NewEventBus("InFlightBus", &abxbus.EventBusOptions{
MaxHistorySize: &zeroHistory,
})
use abxbus_rust::event_bus::{EventBus, EventBusOptions};
let bounded_drop = EventBus::new_with_options(Some("BoundedDropBus".to_string()), EventBusOptions {
max_history_size: Some(100),
max_history_drop: true,
..EventBusOptions::default()
});
let bounded_reject = EventBus::new_with_options(Some("BoundedRejectBus".to_string()), EventBusOptions {
max_history_size: Some(100),
max_history_drop: false,
..EventBusOptions::default()
});
let unbounded = EventBus::new_with_options(Some("UnboundedBus".to_string()), EventBusOptions {
max_history_size: None,
..EventBusOptions::default()
});
let in_flight_only = EventBus::new_with_options(Some("InFlightBus".to_string()), EventBusOptions {
max_history_size: Some(0),
..EventBusOptions::default()
});
Inspecting queue vs history at runtime
Python
TypeScript
Go
Rust
event = bus.emit(MyEvent())
pending_count = bus.pending_event_queue.qsize() if bus.pending_event_queue else 0
history_count = len(bus.event_history)
print('pending_event_queue=', pending_count, 'event_history=', history_count)
# pending_event_queue= 1 event_history= 1
await event.now()
pending_after = bus.pending_event_queue.qsize() if bus.pending_event_queue else 0
history_after = len(bus.event_history)
print('after completion -> pending_event_queue=', pending_after, 'event_history=', history_after)
# after completion -> pending_event_queue= 0 event_history= 1
const event = bus.emit(MyEvent({}))
console.log('pending_event_queue=', bus.pending_event_queue.length, 'event_history=', bus.event_history.size)
await event.now()
console.log('after completion -> pending_event_queue=', bus.pending_event_queue.length, 'event_history=', bus.event_history.size)
event := bus.Emit(abxbus.NewBaseEvent("MyEvent", nil))
var before struct {
PendingEventQueue []string `json:"pending_event_queue"`
EventHistory map[string]any `json:"event_history"`
}
snapshot, err := bus.ToJSON()
if err != nil {
panic(err)
}
if err := json.Unmarshal(snapshot, &before); err != nil {
panic(err)
}
fmt.Println("pending_event_queue=", len(before.PendingEventQueue), "event_history=", len(before.EventHistory))
// pending_event_queue= 1 event_history= 1
if _, err := event.Now(); err != nil {
panic(err)
}
var after struct {
PendingEventQueue []string `json:"pending_event_queue"`
EventHistory map[string]any `json:"event_history"`
}
snapshot, err = bus.ToJSON()
if err != nil {
panic(err)
}
if err := json.Unmarshal(snapshot, &after); err != nil {
panic(err)
}
fmt.Println("after completion -> pending_event_queue=", len(after.PendingEventQueue), "event_history=", len(after.EventHistory))
// after completion -> pending_event_queue= 0 event_history= 1
let event = bus.emit(MyEvent {
..Default::default()
});
println!("pending_event_queue/event_history snapshot={}", bus.to_json_value());
block_on(event.now());
println!("after completion -> event_history={}", bus.event_history_size());