Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
parallel allows multiple handlers for the same event to run at the same time.
Companion runnable example:
Lifecycle impact
- Event starts processing.
- All applicable handlers are scheduled concurrently.
- Event completion waits based on completion mode (
all or first).
- Per-handler timeout/error behavior remains independent per handler.
Execution order example
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class HandlerEvent(BaseEvent):
pass
bus = EventBus('ParallelHandlerBus', event_handler_concurrency='parallel')
in_flight = 0
max_in_flight = 0
release = asyncio.Event()
async def tracked(_: HandlerEvent) -> None:
global in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
await release.wait()
in_flight -= 1
bus.on(HandlerEvent, tracked)
bus.on(HandlerEvent, tracked)
event = bus.emit(HandlerEvent())
await asyncio.sleep(0)
release.set()
await event.now()
assert max_in_flight >= 2
import { BaseEvent, EventBus } from 'abxbus'
const HandlerEvent = BaseEvent.extend('HandlerEvent', {})
const bus = new EventBus('ParallelHandlerBus', { event_handler_concurrency: 'parallel' })
let inFlight = 0
let maxInFlight = 0
let release!: () => void
const gate = new Promise<void>((resolve) => {
release = resolve
})
const tracked = async () => {
inFlight += 1
maxInFlight = Math.max(maxInFlight, inFlight)
await gate
inFlight -= 1
}
bus.on(HandlerEvent, tracked)
bus.on(HandlerEvent, tracked)
const event = bus.emit(HandlerEvent({}))
await new Promise((resolve) => setTimeout(resolve, 0))
release()
await event.now()
if (maxInFlight < 2) throw new Error('expected overlapping handlers')
package main
import (
"sync"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
bus := abxbus.NewEventBus("ParallelHandlerBus", &abxbus.EventBusOptions{
EventHandlerConcurrency: abxbus.EventHandlerConcurrencyParallel,
})
var mu sync.Mutex
inFlight := 0
maxInFlight := 0
started := make(chan struct{}, 2)
release := make(chan struct{})
tracked := func(event *abxbus.BaseEvent) (any, error) {
mu.Lock()
inFlight++
if inFlight > maxInFlight {
maxInFlight = inFlight
}
mu.Unlock()
started <- struct{}{}
<-release
mu.Lock()
inFlight--
mu.Unlock()
return nil, nil
}
bus.On("HandlerEvent", "tracked_a", tracked, nil)
bus.On("HandlerEvent", "tracked_b", tracked, nil)
event := bus.Emit(abxbus.NewBaseEvent("HandlerEvent", nil))
<-started
<-started
close(release)
if _, err := event.Now(); err != nil {
panic(err)
}
if maxInFlight < 2 {
panic("expected overlapping handlers")
}
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::EventHandlerConcurrencyMode,
};
use futures::executor::block_on;
event! {
struct HandlerEvent {
event_result_type: (),
}
}
let bus = EventBus::new_with_options(
Some("ParallelHandlerBus".to_string()),
EventBusOptions {
event_handler_concurrency: EventHandlerConcurrencyMode::Parallel,
..EventBusOptions::default()
},
);
bus.on(HandlerEvent, |_event: HandlerEvent| async move { Ok(()) });
bus.on(HandlerEvent, |_event: HandlerEvent| async move { Ok(()) });
let event = bus.emit(HandlerEvent { ..Default::default() });
block_on(event.now());
Notes
- Best for independent I/O-bound handlers where overlap reduces total latency.
- If handlers mutate shared resources, add explicit synchronization.