Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
all is the default handler completion mode. The event completes only after every matching handler reaches a terminal state.
Lifecycle impact
- All matching handlers are allowed to run.
- A successful early handler does not short-circuit the event.
- Event completion waits for every handler to finish, fail, or time out.
- Result collection includes all successful non-
None / non-undefined return values.
Execution order example
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class CompletionEvent(BaseEvent[str]):
pass
bus = EventBus(
'CompletionAllBus',
event_handler_concurrency='parallel',
event_handler_completion='all',
)
seen: list[str] = []
async def fast_handler(_: CompletionEvent) -> str:
await asyncio.sleep(0.01)
seen.append('fast')
return 'fast'
async def slow_handler(_: CompletionEvent) -> str:
await asyncio.sleep(0.05)
seen.append('slow')
return 'slow'
bus.on(CompletionEvent, fast_handler)
bus.on(CompletionEvent, slow_handler)
event = bus.emit(CompletionEvent())
await event.now()
assert set(seen) == {'fast', 'slow'}
results = await event.event_results_list(raise_if_any=False, raise_if_none=False)
assert set(results) == {'fast', 'slow'}
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const CompletionEvent = BaseEvent.extend('CompletionEvent', { event_result_type: z.string() })
const bus = new EventBus('CompletionAllBus', {
event_handler_concurrency: 'parallel',
event_handler_completion: 'all',
})
const seen: string[] = []
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
bus.on(CompletionEvent, async () => {
await delay(10)
seen.push('fast')
return 'fast'
})
bus.on(CompletionEvent, async () => {
await delay(50)
seen.push('slow')
return 'slow'
})
const event = bus.emit(CompletionEvent({}))
await event.now()
const results = await event.eventResultsList({ raise_if_any: false, raise_if_none: false })
if (seen.length !== 2) throw new Error('expected all handlers to run')
if (!results.includes('fast') || !results.includes('slow')) {
throw new Error('expected both handler return values')
}
package main
import (
"sync"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
bus := abxbus.NewEventBus("CompletionAllBus", &abxbus.EventBusOptions{
EventHandlerConcurrency: abxbus.EventHandlerConcurrencyParallel,
EventHandlerCompletion: abxbus.EventHandlerCompletionAll,
})
var mu sync.Mutex
seen := map[string]bool{}
bus.On("CompletionEvent", "fast_handler", func(event *abxbus.BaseEvent) (any, error) {
time.Sleep(10 * time.Millisecond)
mu.Lock()
seen["fast"] = true
mu.Unlock()
return "fast", nil
}, nil)
bus.On("CompletionEvent", "slow_handler", func(event *abxbus.BaseEvent) (any, error) {
time.Sleep(50 * time.Millisecond)
mu.Lock()
seen["slow"] = true
mu.Unlock()
return "slow", nil
}, nil)
event := bus.Emit(abxbus.NewBaseEvent("CompletionEvent", nil))
if _, err := event.Now(); err != nil {
panic(err)
}
results, err := event.EventResultsList(
&abxbus.EventResultOptions{RaiseIfAny: false, RaiseIfNone: false},
)
if err != nil {
panic(err)
}
mu.Lock()
allSeen := seen["fast"] && seen["slow"]
mu.Unlock()
if !allSeen {
panic("expected all handlers to run")
}
if len(results) != 2 {
panic("expected both handler return values")
}
}
use abxbus_rust::{
base_event::EventResultOptions,
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode},
};
use futures::executor::block_on;
use serde_json::json;
event! {
struct CompletionEvent {
event_result_type: serde_json::Value,
}
}
let bus = EventBus::new_with_options(
Some("CompletionAllBus".to_string()),
EventBusOptions {
event_handler_concurrency: EventHandlerConcurrencyMode::Parallel,
event_handler_completion: EventHandlerCompletionMode::All,
..EventBusOptions::default()
},
);
bus.on(CompletionEvent, |_event: CompletionEvent| async move { Ok(json!("fast")) });
bus.on(CompletionEvent, |_event: CompletionEvent| async move { Ok(json!("slow")) });
let event = bus.emit(CompletionEvent { ..Default::default() });
let results = block_on(event.event_results_list_with_options(EventResultOptions {
raise_if_any: false,
raise_if_none: false,
..EventResultOptions::default()
}))?;
assert_eq!(results.len(), 2);
Notes
all is best when multiple handlers contribute required side effects.
- Handler scheduling (
serial vs parallel) changes overlap, but not the fact that all handlers must settle.