Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
Using the default options out-of-the-box, all events and handlers on a bus process in strict serial order to make execution order predictable and consistency easy.
This is the default behavior because:
event_concurrency='bus-serial'
event_handler_concurrency='serial'
event_handler_completion='all'
On a single bus, that means event N+1 never starts before event N is complete, even if event N+1 handlers are “faster”.
As you scale, you can tune these guarantees. See Concurrency Control in the sidebar for all modes and tradeoffs.
Variable handler runtimes still stay FIFO
import asyncio
from abxbus import BaseEvent, EventBus
class JobEvent(BaseEvent):
order: int
delay_s: float
bus = EventBus('FifoBus')
started_order: list[int] = []
completed_order: list[int] = []
async def on_job(event: JobEvent) -> None:
started_order.append(event.order)
await asyncio.sleep(event.delay_s)
completed_order.append(event.order)
bus.on(JobEvent, on_job)
emitted = [
bus.emit(JobEvent(order=0, delay_s=0.030)),
bus.emit(JobEvent(order=1, delay_s=0.001)),
bus.emit(JobEvent(order=2, delay_s=0.020)),
]
await bus.wait_until_idle()
print(started_order)
# [0, 1, 2]
print(completed_order)
# [0, 1, 2]
print([event.event_started_at is not None for event in emitted])
# [True, True, True]
print([event.event_completed_at is not None for event in emitted])
# [True, True, True]
print(emitted[0].event_started_at <= emitted[1].event_started_at <= emitted[2].event_started_at)
# True
print(emitted[0].event_completed_at <= emitted[1].event_completed_at <= emitted[2].event_completed_at)
# True
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const JobEvent = BaseEvent.extend('JobEvent', {
order: z.number(),
delay_ms: z.number(),
})
const bus = new EventBus('FifoBus')
const startedOrder: number[] = []
const completedOrder: number[] = []
bus.on(JobEvent, async (event) => {
startedOrder.push(event.order)
await new Promise((resolve) => setTimeout(resolve, event.delay_ms))
completedOrder.push(event.order)
})
const emitted = [
bus.emit(JobEvent({ order: 0, delay_ms: 30 })),
bus.emit(JobEvent({ order: 1, delay_ms: 1 })),
bus.emit(JobEvent({ order: 2, delay_ms: 20 })),
]
await bus.waitUntilIdle()
console.log(startedOrder)
// [0, 1, 2]
console.log(completedOrder)
// [0, 1, 2]
console.log(emitted.map((event) => Boolean(event.event_started_at)))
// [true, true, true]
console.log(emitted.map((event) => Boolean(event.event_completed_at)))
// [true, true, true]
console.log(
Date.parse(emitted[0].event_started_at!) <=
Date.parse(emitted[1].event_started_at!) &&
Date.parse(emitted[1].event_started_at!) <= Date.parse(emitted[2].event_started_at!)
)
// true
console.log(
Date.parse(emitted[0].event_completed_at!) <=
Date.parse(emitted[1].event_completed_at!) &&
Date.parse(emitted[1].event_completed_at!) <= Date.parse(emitted[2].event_completed_at!)
)
// true
Ambiguous case: slow then fast still runs serially
Even if you emit a slow event and then a fast event right after, the fast one does not overtake on the same bus under defaults.
import asyncio
from abxbus import BaseEvent, EventBus
class SlowEvent(BaseEvent):
name: str
class FastEvent(BaseEvent):
name: str
bus = EventBus('FifoBus')
trace: list[str] = []
async def on_slow(event: SlowEvent) -> None:
trace.append(f'start:{event.event_type}:{event.name}')
await asyncio.sleep(0.040)
trace.append(f'end:{event.event_type}:{event.name}')
async def on_fast(event: FastEvent) -> None:
trace.append(f'start:{event.event_type}:{event.name}')
await asyncio.sleep(0.001)
trace.append(f'end:{event.event_type}:{event.name}')
bus.on(SlowEvent, on_slow)
bus.on(FastEvent, on_fast)
slow = bus.emit(SlowEvent(name='slow-a'))
fast = bus.emit(FastEvent(name='fast-b'))
await bus.wait_until_idle()
print(trace)
# ['start:SlowEvent:slow-a', 'end:SlowEvent:slow-a', 'start:FastEvent:fast-b', 'end:FastEvent:fast-b']
print(slow.event_completed_at <= fast.event_started_at)
# True
tree_lines = [
line for line in bus.log_tree().splitlines()
if 'SlowEvent#' in line or 'FastEvent#' in line
]
print(tree_lines)
# ['├── SlowEvent#6aa1 [14:09:10.120 (0.040s)]', '└── FastEvent#6aa2 [14:09:10.161 (0.001s)]']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const SlowEvent = BaseEvent.extend('SlowEvent', {
name: z.string(),
})
const FastEvent = BaseEvent.extend('FastEvent', {
name: z.string(),
})
const bus = new EventBus('FifoBus')
const trace: string[] = []
bus.on(SlowEvent, async (event) => {
trace.push(`start:${event.event_type}:${event.name}`)
await new Promise((resolve) => setTimeout(resolve, 40))
trace.push(`end:${event.event_type}:${event.name}`)
})
bus.on(FastEvent, async (event) => {
trace.push(`start:${event.event_type}:${event.name}`)
await new Promise((resolve) => setTimeout(resolve, 1))
trace.push(`end:${event.event_type}:${event.name}`)
})
const slow = bus.emit(SlowEvent({ name: 'slow-a' }))
const fast = bus.emit(FastEvent({ name: 'fast-b' }))
await bus.waitUntilIdle()
console.log(trace)
// ['start:SlowEvent:slow-a', 'end:SlowEvent:slow-a', 'start:FastEvent:fast-b', 'end:FastEvent:fast-b']
console.log(Date.parse(slow.event_completed_at!) <= Date.parse(fast.event_started_at!))
// true
const treeLines = bus
.logTree()
.split('\n')
.filter((line) => line.includes('SlowEvent#') || line.includes('FastEvent#'))
console.log(treeLines)
// ['├── ✅ SlowEvent#6aa1 [14:09:10.120 (0.040s)]', '└── ✅ FastEvent#6aa2 [14:09:10.161 (0.001s)]']
Important exception: awaited child events
Inside a running handler, if you emit and await a child event, that child can queue-jump for RPC-style behavior. This is the intentional exception to plain FIFO queue order.
See Immediate Execution (RPC-style) for exact behavior and mode interactions.