Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
parallel removes event-level serialization for a bus, so multiple events can be in-flight simultaneously.
Companion runnable example:
Lifecycle impact
- Events still enqueue and are tracked in history.
- The bus does not gate execution with an event semaphore.
- Handler-level concurrency rules still apply within each event.
- Ordering guarantees become weaker under load because events can overlap.
Execution order example
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class ParallelEvent(BaseEvent):
order: int
bus = EventBus('ParallelEventBus', event_concurrency='parallel', event_handler_concurrency='parallel')
in_flight = 0
max_in_flight = 0
release = asyncio.Event()
async def handler(_: ParallelEvent) -> None:
global in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
await release.wait()
await asyncio.sleep(0.01)
in_flight -= 1
bus.on(ParallelEvent, handler)
bus.emit(ParallelEvent(order=0))
bus.emit(ParallelEvent(order=1))
await asyncio.sleep(0)
release.set()
await bus.wait_until_idle()
assert max_in_flight >= 2
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const ParallelEvent = BaseEvent.extend('ParallelEvent', { order: z.number() })
const bus = new EventBus('ParallelEventBus', {
event_concurrency: 'parallel',
event_handler_concurrency: 'parallel',
})
let inFlight = 0
let maxInFlight = 0
let release!: () => void
const gate = new Promise<void>((resolve) => {
release = resolve
})
bus.on(ParallelEvent, async () => {
inFlight += 1
maxInFlight = Math.max(maxInFlight, inFlight)
await gate
await new Promise((resolve) => setTimeout(resolve, 10))
inFlight -= 1
})
bus.emit(ParallelEvent({ order: 0 }))
bus.emit(ParallelEvent({ order: 1 }))
await new Promise((resolve) => setTimeout(resolve, 0))
release()
await bus.waitUntilIdle()
if (maxInFlight < 2) throw new Error('expected overlapping events')
package main
import (
"sync"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
bus := abxbus.NewEventBus("ParallelEventBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyParallel,
EventHandlerConcurrency: abxbus.EventHandlerConcurrencyParallel,
})
var mu sync.Mutex
inFlight := 0
maxInFlight := 0
release := make(chan struct{})
bus.On("ParallelEvent", "handler", func(event *abxbus.BaseEvent) (any, error) {
mu.Lock()
inFlight++
if inFlight > maxInFlight {
maxInFlight = inFlight
}
mu.Unlock()
<-release
time.Sleep(10 * time.Millisecond)
mu.Lock()
inFlight--
mu.Unlock()
return nil, nil
}, nil)
bus.Emit(abxbus.NewBaseEvent("ParallelEvent", map[string]any{"order": 0}))
bus.Emit(abxbus.NewBaseEvent("ParallelEvent", map[string]any{"order": 1}))
time.Sleep(10 * time.Millisecond)
close(release)
bus.WaitUntilIdle(nil)
if maxInFlight < 2 {
panic("expected overlapping events")
}
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::{EventConcurrencyMode, EventHandlerConcurrencyMode},
};
use futures::executor::block_on;
event! {
struct ParallelEvent {
event_result_type: (),
}
}
let bus = EventBus::new_with_options(
Some("ParallelEventBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::Parallel,
event_handler_concurrency: EventHandlerConcurrencyMode::Parallel,
..EventBusOptions::default()
},
);
bus.on(ParallelEvent, |_event: ParallelEvent| async move { Ok(()) });
bus.emit(ParallelEvent { ..Default::default() });
bus.emit(ParallelEvent { ..Default::default() });
block_on(bus.wait_until_idle(None));
Notes
- Use when throughput matters more than deterministic event ordering.
- Combine with idempotent handlers and explicit external coordination when needed.