event_concurrency='bus-serial'event_handler_concurrency='serial'event_handler_completion='all'
N+1 never starts before event N is complete, even if event N+1 handlers are “faster”.
As you scale, you can tune these guarantees. See Concurrency Control in the sidebar for all modes and tradeoffs.
Variable handler runtimes still stay FIFO
- Python
- TypeScript
- Go
- Rust
import asyncio
from abxbus import BaseEvent, EventBus
class JobEvent(BaseEvent):
order: int
delay_s: float
bus = EventBus('FifoBus')
started_order: list[int] = []
completed_order: list[int] = []
async def on_job(event: JobEvent) -> None:
started_order.append(event.order)
await asyncio.sleep(event.delay_s)
completed_order.append(event.order)
bus.on(JobEvent, on_job)
emitted = [
bus.emit(JobEvent(order=0, delay_s=0.030)),
bus.emit(JobEvent(order=1, delay_s=0.001)),
bus.emit(JobEvent(order=2, delay_s=0.020)),
]
await bus.wait_until_idle()
print(started_order)
# [0, 1, 2]
print(completed_order)
# [0, 1, 2]
print([event.event_started_at is not None for event in emitted])
# [True, True, True]
print([event.event_completed_at is not None for event in emitted])
# [True, True, True]
print(emitted[0].event_started_at <= emitted[1].event_started_at <= emitted[2].event_started_at)
# True
print(emitted[0].event_completed_at <= emitted[1].event_completed_at <= emitted[2].event_completed_at)
# True
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const JobEvent = BaseEvent.extend('JobEvent', {
order: z.number(),
delay_ms: z.number(),
})
const bus = new EventBus('FifoBus')
const startedOrder: number[] = []
const completedOrder: number[] = []
bus.on(JobEvent, async (event) => {
startedOrder.push(event.order)
await new Promise((resolve) => setTimeout(resolve, event.delay_ms))
completedOrder.push(event.order)
})
const emitted = [
bus.emit(JobEvent({ order: 0, delay_ms: 30 })),
bus.emit(JobEvent({ order: 1, delay_ms: 1 })),
bus.emit(JobEvent({ order: 2, delay_ms: 20 })),
]
await bus.waitUntilIdle()
console.log(startedOrder)
// [0, 1, 2]
console.log(completedOrder)
// [0, 1, 2]
console.log(emitted.map((event) => Boolean(event.event_started_at)))
// [true, true, true]
console.log(emitted.map((event) => Boolean(event.event_completed_at)))
// [true, true, true]
console.log(
Date.parse(emitted[0].event_started_at!) <=
Date.parse(emitted[1].event_started_at!) &&
Date.parse(emitted[1].event_started_at!) <= Date.parse(emitted[2].event_started_at!)
)
// true
console.log(
Date.parse(emitted[0].event_completed_at!) <=
Date.parse(emitted[1].event_completed_at!) &&
Date.parse(emitted[1].event_completed_at!) <= Date.parse(emitted[2].event_completed_at!)
)
// true
package main
import (
"fmt"
"reflect"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go"
)
func main() {
bus := abxbus.NewEventBus("FifoBus", nil)
startedOrder := []int{}
completedOrder := []int{}
bus.On("JobEvent", "on_job", func(event *abxbus.BaseEvent) (any, error) {
order := event.Payload["order"].(int)
delayMS := event.Payload["delay_ms"].(int)
startedOrder = append(startedOrder, order)
time.Sleep(time.Duration(delayMS) * time.Millisecond)
completedOrder = append(completedOrder, order)
return nil, nil
}, nil)
emitted := []*abxbus.BaseEvent{
bus.Emit(abxbus.NewBaseEvent("JobEvent", map[string]any{"order": 0, "delay_ms": 30})),
bus.Emit(abxbus.NewBaseEvent("JobEvent", map[string]any{"order": 1, "delay_ms": 1})),
bus.Emit(abxbus.NewBaseEvent("JobEvent", map[string]any{"order": 2, "delay_ms": 20})),
}
bus.WaitUntilIdle(nil)
fmt.Println(startedOrder)
// [0 1 2]
fmt.Println(completedOrder)
// [0 1 2]
if !reflect.DeepEqual(startedOrder, []int{0, 1, 2}) {
panic("started order is not FIFO")
}
if !reflect.DeepEqual(completedOrder, []int{0, 1, 2}) {
panic("completed order is not FIFO")
}
fmt.Println(emitted[0].EventStartedAt != nil, emitted[1].EventStartedAt != nil, emitted[2].EventStartedAt != nil)
// true true true
}
use abxbus::{event, event_bus::EventBus};
use futures::executor::block_on;
use std::sync::{Arc, Mutex};
event! {
struct JobEvent {
order: i64,
event_result_type: (),
}
}
let bus = EventBus::new(Some("FifoBus".to_string()));
let started_order = Arc::new(Mutex::new(Vec::new()));
let completed_order = Arc::new(Mutex::new(Vec::new()));
let started = started_order.clone();
let completed = completed_order.clone();
bus.on(JobEvent, move |event: JobEvent| async move {
let order = event.order;
started.lock().unwrap().push(order);
completed.lock().unwrap().push(order);
Ok(())
});
for order in 0..3 {
bus.emit(JobEvent { order, ..Default::default() });
}
block_on(bus.wait_until_idle(None));
assert_eq!(*started_order.lock().unwrap(), vec![0, 1, 2]);
assert_eq!(*completed_order.lock().unwrap(), vec![0, 1, 2]);
Ambiguous case: slow then fast still runs serially
Even if you emit a slow event and then a fast event right after, the fast one does not overtake on the same bus under defaults.- Python
- TypeScript
- Go
- Rust
import asyncio
from abxbus import BaseEvent, EventBus
class SlowEvent(BaseEvent):
name: str
class FastEvent(BaseEvent):
name: str
bus = EventBus('FifoBus')
trace: list[str] = []
async def on_slow(event: SlowEvent) -> None:
trace.append(f'start:{event.event_type}:{event.name}')
await asyncio.sleep(0.040)
trace.append(f'end:{event.event_type}:{event.name}')
async def on_fast(event: FastEvent) -> None:
trace.append(f'start:{event.event_type}:{event.name}')
await asyncio.sleep(0.001)
trace.append(f'end:{event.event_type}:{event.name}')
bus.on(SlowEvent, on_slow)
bus.on(FastEvent, on_fast)
slow = bus.emit(SlowEvent(name='slow-a'))
fast = bus.emit(FastEvent(name='fast-b'))
await bus.wait_until_idle()
print(trace)
# ['start:SlowEvent:slow-a', 'end:SlowEvent:slow-a', 'start:FastEvent:fast-b', 'end:FastEvent:fast-b']
print(slow.event_completed_at <= fast.event_started_at)
# True
tree_lines = [
line for line in bus.log_tree().splitlines()
if 'SlowEvent#' in line or 'FastEvent#' in line
]
print(tree_lines)
# ['├── SlowEvent#6aa1 [14:09:10.120 (0.040s)]', '└── FastEvent#6aa2 [14:09:10.161 (0.001s)]']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const SlowEvent = BaseEvent.extend('SlowEvent', {
name: z.string(),
})
const FastEvent = BaseEvent.extend('FastEvent', {
name: z.string(),
})
const bus = new EventBus('FifoBus')
const trace: string[] = []
bus.on(SlowEvent, async (event) => {
trace.push(`start:${event.event_type}:${event.name}`)
await new Promise((resolve) => setTimeout(resolve, 40))
trace.push(`end:${event.event_type}:${event.name}`)
})
bus.on(FastEvent, async (event) => {
trace.push(`start:${event.event_type}:${event.name}`)
await new Promise((resolve) => setTimeout(resolve, 1))
trace.push(`end:${event.event_type}:${event.name}`)
})
const slow = bus.emit(SlowEvent({ name: 'slow-a' }))
const fast = bus.emit(FastEvent({ name: 'fast-b' }))
await bus.waitUntilIdle()
console.log(trace)
// ['start:SlowEvent:slow-a', 'end:SlowEvent:slow-a', 'start:FastEvent:fast-b', 'end:FastEvent:fast-b']
console.log(Date.parse(slow.event_completed_at!) <= Date.parse(fast.event_started_at!))
// true
const treeLines = bus
.logTree()
.split('\n')
.filter((line) => line.includes('SlowEvent#') || line.includes('FastEvent#'))
console.log(treeLines)
// ['├── ✅ SlowEvent#6aa1 [14:09:10.120 (0.040s)]', '└── ✅ FastEvent#6aa2 [14:09:10.161 (0.001s)]']
bus := abxbus.NewEventBus("FifoBus", nil)
trace := []string{}
bus.On("SlowEvent", "on_slow", func(event *abxbus.BaseEvent) (any, error) {
trace = append(trace, fmt.Sprintf("start:%s:%s", event.EventType, event.Payload["name"]))
time.Sleep(40 * time.Millisecond)
trace = append(trace, fmt.Sprintf("end:%s:%s", event.EventType, event.Payload["name"]))
return nil, nil
}, nil)
bus.On("FastEvent", "on_fast", func(event *abxbus.BaseEvent) (any, error) {
trace = append(trace, fmt.Sprintf("start:%s:%s", event.EventType, event.Payload["name"]))
time.Sleep(1 * time.Millisecond)
trace = append(trace, fmt.Sprintf("end:%s:%s", event.EventType, event.Payload["name"]))
return nil, nil
}, nil)
slow := bus.Emit(abxbus.NewBaseEvent("SlowEvent", map[string]any{"name": "slow-a"}))
fast := bus.Emit(abxbus.NewBaseEvent("FastEvent", map[string]any{"name": "fast-b"}))
bus.WaitUntilIdle(nil)
fmt.Println(trace)
// [start:SlowEvent:slow-a end:SlowEvent:slow-a start:FastEvent:fast-b end:FastEvent:fast-b]
fmt.Println(*slow.EventCompletedAt <= *fast.EventStartedAt)
// true
fmt.Println(bus.LogTree())
use abxbus::{event, event_bus::EventBus};
use futures::executor::block_on;
use std::sync::{Arc, Mutex};
event! {
struct SlowEvent {
event_result_type: (),
}
}
event! {
struct FastEvent {
event_result_type: (),
}
}
let bus = EventBus::new(Some("FifoBus".to_string()));
let trace = Arc::new(Mutex::new(Vec::new()));
let trace_for_slow = trace.clone();
bus.on(SlowEvent, move |_event: SlowEvent| async move {
trace_for_slow.lock().unwrap().push("start:SlowEvent".to_string());
trace_for_slow.lock().unwrap().push("end:SlowEvent".to_string());
Ok(())
});
let trace_for_fast = trace.clone();
bus.on(FastEvent, move |_event: FastEvent| async move {
trace_for_fast.lock().unwrap().push("start:FastEvent".to_string());
trace_for_fast.lock().unwrap().push("end:FastEvent".to_string());
Ok(())
});
let slow = bus.emit(SlowEvent { ..Default::default() });
let fast = bus.emit(FastEvent { ..Default::default() });
block_on(bus.wait_until_idle(None));
assert!(slow.event_completed_at.read() <= fast.event_started_at.read());
println!("{}", bus.log_tree());