Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
global-serial enforces a single global event-processing slot across all EventBus instances.
Note that the global lock is scoped to the EventBus class; if you need separate global lock domains, subclass EventBus.
Companion runnable example:
Lifecycle impact
- An emitted event is queued on its target bus as usual.
- Before handler execution starts, the bus acquires the shared global event lock.
- While one event is running anywhere, other buses wait.
- Handler-level concurrency still applies inside that one active event.
Execution order example
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class SerialEvent(BaseEvent):
order: int
source: str
bus_a = EventBus('GlobalSerialA', event_concurrency='global-serial')
bus_b = EventBus('GlobalSerialB', event_concurrency='global-serial')
in_flight = 0
max_in_flight = 0
starts: list[str] = []
async def handler(event: SerialEvent) -> None:
global in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
starts.append(f'{event.source}:{event.order}')
await asyncio.sleep(0.01)
in_flight -= 1
bus_a.on(SerialEvent, handler)
bus_b.on(SerialEvent, handler)
for i in range(3):
bus_a.emit(SerialEvent(order=i, source='a'))
bus_b.emit(SerialEvent(order=i, source='b'))
await bus_a.wait_until_idle()
await bus_b.wait_until_idle()
assert max_in_flight == 1
assert [s for s in starts if s.startswith('a:')] == ['a:0', 'a:1', 'a:2']
assert [s for s in starts if s.startswith('b:')] == ['b:0', 'b:1', 'b:2']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const SerialEvent = BaseEvent.extend('SerialEvent', {
order: z.number(),
source: z.string(),
})
const busA = new EventBus('GlobalSerialA', { event_concurrency: 'global-serial' })
const busB = new EventBus('GlobalSerialB', { event_concurrency: 'global-serial' })
let inFlight = 0
let maxInFlight = 0
const starts: string[] = []
const handler = async (event: InstanceType<typeof SerialEvent>) => {
inFlight += 1
maxInFlight = Math.max(maxInFlight, inFlight)
starts.push(`${event.source}:${event.order}`)
await new Promise((resolve) => setTimeout(resolve, 10))
inFlight -= 1
}
busA.on(SerialEvent, handler)
busB.on(SerialEvent, handler)
for (let i = 0; i < 3; i += 1) {
busA.emit(SerialEvent({ order: i, source: 'a' }))
busB.emit(SerialEvent({ order: i, source: 'b' }))
}
await busA.waitUntilIdle()
await busB.waitUntilIdle()
if (maxInFlight !== 1) throw new Error('expected global serialization')
package main
import (
"strings"
"sync"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
options := &abxbus.EventBusOptions{EventConcurrency: abxbus.EventConcurrencyGlobalSerial}
busA := abxbus.NewEventBus("GlobalSerialA", options)
busB := abxbus.NewEventBus("GlobalSerialB", options)
var mu sync.Mutex
inFlight := 0
maxInFlight := 0
starts := []string{}
handler := func(event *abxbus.BaseEvent) (any, error) {
mu.Lock()
inFlight++
if inFlight > maxInFlight {
maxInFlight = inFlight
}
starts = append(starts, event.Payload["source"].(string))
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
inFlight--
mu.Unlock()
return nil, nil
}
busA.On("SerialEvent", "handler", handler, nil)
busB.On("SerialEvent", "handler", handler, nil)
for i := 0; i < 3; i++ {
busA.Emit(abxbus.NewBaseEvent("SerialEvent", map[string]any{"order": i, "source": "a"}))
busB.Emit(abxbus.NewBaseEvent("SerialEvent", map[string]any{"order": i, "source": "b"}))
}
busA.WaitUntilIdle(nil)
busB.WaitUntilIdle(nil)
if maxInFlight != 1 {
panic("expected global serialization")
}
if strings.Count(strings.Join(starts, ""), "a") != 3 || strings.Count(strings.Join(starts, ""), "b") != 3 {
panic("expected both buses to process their queues")
}
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::EventConcurrencyMode,
};
use futures::executor::block_on;
event! {
struct SerialEvent {
event_result_type: (),
}
}
let options = EventBusOptions {
event_concurrency: EventConcurrencyMode::GlobalSerial,
..EventBusOptions::default()
};
let bus_a = EventBus::new_with_options(Some("GlobalSerialA".to_string()), options.clone());
let bus_b = EventBus::new_with_options(Some("GlobalSerialB".to_string()), options);
bus_a.on(SerialEvent, |_event: SerialEvent| async move { Ok(()) });
bus_b.on(SerialEvent, |_event: SerialEvent| async move { Ok(()) });
for _ in 0..3 {
bus_a.emit(SerialEvent { ..Default::default() });
bus_b.emit(SerialEvent { ..Default::default() });
}
block_on(bus_a.wait_until_idle(None));
block_on(bus_b.wait_until_idle(None));
Notes
- This mode is strongest for determinism across distributed in-process bus topologies.
- Queue-jump behavior (
await event.now() inside handlers) still applies, but it does so under the same global lock.