Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
bus-serial enforces one active event per bus, while different buses can process events simultaneously.
Companion runnable example:
Lifecycle impact
- Events enqueue per bus in FIFO order.
- Each bus holds its own event lock.
- A busy bus does not block other buses.
- Queue-jump child events can preempt that same bus queue when awaited in-handler.
Execution order example
- Python
- TypeScript
- Go
- Rust
import asyncio
from abxbus import BaseEvent, EventBus
class WorkEvent(BaseEvent):
order: int
source: str
bus_a = EventBus('BusSerialA', event_concurrency='bus-serial')
bus_b = EventBus('BusSerialB', event_concurrency='bus-serial')
starts_a: list[int] = []
starts_b: list[int] = []
in_flight_global = 0
max_in_flight_global = 0
async def handler_a(event: WorkEvent) -> None:
global in_flight_global, max_in_flight_global
in_flight_global += 1
max_in_flight_global = max(max_in_flight_global, in_flight_global)
starts_a.append(event.order)
await asyncio.sleep(0.01)
in_flight_global -= 1
async def handler_b(event: WorkEvent) -> None:
global in_flight_global, max_in_flight_global
in_flight_global += 1
max_in_flight_global = max(max_in_flight_global, in_flight_global)
starts_b.append(event.order)
await asyncio.sleep(0.01)
in_flight_global -= 1
bus_a.on(WorkEvent, handler_a)
bus_b.on(WorkEvent, handler_b)
for i in range(4):
bus_a.emit(WorkEvent(order=i, source='a'))
bus_b.emit(WorkEvent(order=i, source='b'))
await bus_a.wait_until_idle()
await bus_b.wait_until_idle()
assert starts_a == [0, 1, 2, 3]
assert starts_b == [0, 1, 2, 3]
assert max_in_flight_global >= 2
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const WorkEvent = BaseEvent.extend('WorkEvent', {
order: z.number(),
source: z.string(),
})
const busA = new EventBus('BusSerialA', { event_concurrency: 'bus-serial' })
const busB = new EventBus('BusSerialB', { event_concurrency: 'bus-serial' })
const startsA: number[] = []
const startsB: number[] = []
busA.on(WorkEvent, async (event) => {
startsA.push(event.order)
await new Promise((resolve) => setTimeout(resolve, 2))
})
busB.on(WorkEvent, async (event) => {
startsB.push(event.order)
await new Promise((resolve) => setTimeout(resolve, 2))
})
for (let i = 0; i < 4; i += 1) {
busA.emit(WorkEvent({ order: i, source: 'a' }))
busB.emit(WorkEvent({ order: i, source: 'b' }))
}
await busA.waitUntilIdle()
await busB.waitUntilIdle()
if (JSON.stringify(startsA) !== JSON.stringify([0, 1, 2, 3])) throw new Error('bus A FIFO failed')
if (JSON.stringify(startsB) !== JSON.stringify([0, 1, 2, 3])) throw new Error('bus B FIFO failed')
package main
import (
"reflect"
"sync"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
busA := abxbus.NewEventBus("BusSerialA", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyBusSerial,
})
busB := abxbus.NewEventBus("BusSerialB", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyBusSerial,
})
var mu sync.Mutex
startsA := []int{}
startsB := []int{}
inFlightGlobal := 0
maxInFlightGlobal := 0
handler := func(starts *[]int) func(*abxbus.BaseEvent) (any, error) {
return func(event *abxbus.BaseEvent) (any, error) {
mu.Lock()
inFlightGlobal++
if inFlightGlobal > maxInFlightGlobal {
maxInFlightGlobal = inFlightGlobal
}
*starts = append(*starts, event.Payload["order"].(int))
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
inFlightGlobal--
mu.Unlock()
return nil, nil
}
}
busA.On("WorkEvent", "handler_a", handler(&startsA), nil)
busB.On("WorkEvent", "handler_b", handler(&startsB), nil)
for i := 0; i < 4; i++ {
busA.Emit(abxbus.NewBaseEvent("WorkEvent", map[string]any{"order": i, "source": "a"}))
busB.Emit(abxbus.NewBaseEvent("WorkEvent", map[string]any{"order": i, "source": "b"}))
}
busA.WaitUntilIdle(nil)
busB.WaitUntilIdle(nil)
if !reflect.DeepEqual(startsA, []int{0, 1, 2, 3}) {
panic("bus A FIFO failed")
}
if !reflect.DeepEqual(startsB, []int{0, 1, 2, 3}) {
panic("bus B FIFO failed")
}
if maxInFlightGlobal < 2 {
panic("expected buses to overlap")
}
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::EventConcurrencyMode,
};
use futures::executor::block_on;
event! {
struct WorkEvent {
event_result_type: (),
}
}
let bus_a = EventBus::new_with_options(
Some("BusSerialA".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::BusSerial,
..EventBusOptions::default()
},
);
let bus_b = EventBus::new_with_options(
Some("BusSerialB".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::BusSerial,
..EventBusOptions::default()
},
);
bus_a.on(WorkEvent, |_event: WorkEvent| async move { Ok(()) });
bus_b.on(WorkEvent, |_event: WorkEvent| async move { Ok(()) });
for _ in 0..4 {
bus_a.emit(WorkEvent { ..Default::default() });
bus_b.emit(WorkEvent { ..Default::default() });
}
block_on(bus_a.wait_until_idle(None));
block_on(bus_b.wait_until_idle(None));
Notes
- This is typically the best default for multi-bus systems.
- It preserves local determinism while retaining cross-bus throughput.