Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
bus-serial enforces one active event per bus, while different buses can process events simultaneously.
Companion runnable example:
Lifecycle impact
- Events enqueue per bus in FIFO order.
- Each bus holds its own event lock.
- A busy bus does not block other buses.
- Queue-jump child events can preempt that same bus queue when awaited in-handler.
Execution order example
import asyncio
from abxbus import BaseEvent, EventBus
class WorkEvent(BaseEvent):
order: int
source: str
bus_a = EventBus('BusSerialA', event_concurrency='bus-serial')
bus_b = EventBus('BusSerialB', event_concurrency='bus-serial')
starts_a: list[int] = []
starts_b: list[int] = []
in_flight_global = 0
max_in_flight_global = 0
async def handler_a(event: WorkEvent) -> None:
global in_flight_global, max_in_flight_global
in_flight_global += 1
max_in_flight_global = max(max_in_flight_global, in_flight_global)
starts_a.append(event.order)
await asyncio.sleep(0.01)
in_flight_global -= 1
async def handler_b(event: WorkEvent) -> None:
global in_flight_global, max_in_flight_global
in_flight_global += 1
max_in_flight_global = max(max_in_flight_global, in_flight_global)
starts_b.append(event.order)
await asyncio.sleep(0.01)
in_flight_global -= 1
bus_a.on(WorkEvent, handler_a)
bus_b.on(WorkEvent, handler_b)
for i in range(4):
bus_a.emit(WorkEvent(order=i, source='a'))
bus_b.emit(WorkEvent(order=i, source='b'))
await bus_a.wait_until_idle()
await bus_b.wait_until_idle()
assert starts_a == [0, 1, 2, 3]
assert starts_b == [0, 1, 2, 3]
assert max_in_flight_global >= 2
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const WorkEvent = BaseEvent.extend('WorkEvent', {
order: z.number(),
source: z.string(),
})
const busA = new EventBus('BusSerialA', { event_concurrency: 'bus-serial' })
const busB = new EventBus('BusSerialB', { event_concurrency: 'bus-serial' })
const startsA: number[] = []
const startsB: number[] = []
busA.on(WorkEvent, async (event) => {
startsA.push(event.order)
await new Promise((resolve) => setTimeout(resolve, 2))
})
busB.on(WorkEvent, async (event) => {
startsB.push(event.order)
await new Promise((resolve) => setTimeout(resolve, 2))
})
for (let i = 0; i < 4; i += 1) {
busA.emit(WorkEvent({ order: i, source: 'a' }))
busB.emit(WorkEvent({ order: i, source: 'b' }))
}
await busA.waitUntilIdle()
await busB.waitUntilIdle()
if (JSON.stringify(startsA) !== JSON.stringify([0, 1, 2, 3])) throw new Error('bus A FIFO failed')
if (JSON.stringify(startsB) !== JSON.stringify([0, 1, 2, 3])) throw new Error('bus B FIFO failed')
Notes
- This is typically the best default for multi-bus systems.
- It preserves local determinism while retaining cross-bus throughput.