Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
first short-circuits event completion once the first successful non-None / non-undefined result is available.
Lifecycle impact
- The first successful result wins (
None/undefined and errors do not win).
- In
serial handler mode, remaining handlers are skipped once a winner appears.
- In
parallel handler mode, in-flight losers are cancelled or aborted.
- Event completion resolves as soon as a winner is found (or all handlers fail).
Execution order example
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class CompletionEvent(BaseEvent[str]):
pass
bus = EventBus(
'CompletionFirstBus',
event_handler_concurrency='parallel',
event_handler_completion='first',
)
state = {'slow_started': False, 'slow_cancelled': False}
async def fast_handler(_: CompletionEvent) -> str:
await asyncio.sleep(0.01)
return 'winner'
async def slow_handler(_: CompletionEvent) -> str:
state['slow_started'] = True
try:
await asyncio.sleep(0.5)
return 'slow'
except asyncio.CancelledError:
state['slow_cancelled'] = True
raise
bus.on(CompletionEvent, slow_handler)
bus.on(CompletionEvent, fast_handler)
event = bus.emit(CompletionEvent())
await event.now()
value = await event.event_result(raise_if_any=False, raise_if_none=False)
assert value == 'winner'
assert state['slow_started'] is True
assert state['slow_cancelled'] is True
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const CompletionEvent = BaseEvent.extend('CompletionEvent', { event_result_type: z.string() })
const bus = new EventBus('CompletionFirstBus', {
event_handler_concurrency: 'parallel',
event_handler_completion: 'first',
})
let slowStarted = false
let slowCompleted = false
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
bus.on(CompletionEvent, async () => {
slowStarted = true
await delay(500)
slowCompleted = true
return 'slow'
})
bus.on(CompletionEvent, async () => {
await delay(10)
return 'winner'
})
const event = bus.emit(CompletionEvent({}))
await event.now()
if ((await event.eventResult({ raise_if_any: false, raise_if_none: false })) !== 'winner') {
throw new Error('expected first winner result')
}
if (!slowStarted) throw new Error('expected slow handler to start')
if (slowCompleted) throw new Error('slow handler should not complete before event resolves')
package main
import (
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
func main() {
bus := abxbus.NewEventBus("CompletionFirstBus", &abxbus.EventBusOptions{
EventHandlerConcurrency: abxbus.EventHandlerConcurrencyParallel,
EventHandlerCompletion: abxbus.EventHandlerCompletionFirst,
})
slowStarted := make(chan struct{}, 1)
slowCompleted := make(chan struct{}, 1)
bus.On("CompletionEvent", "slow_handler", func(event *abxbus.BaseEvent) (any, error) {
slowStarted <- struct{}{}
time.Sleep(500 * time.Millisecond)
slowCompleted <- struct{}{}
return "slow", nil
}, nil)
bus.On("CompletionEvent", "fast_handler", func(event *abxbus.BaseEvent) (any, error) {
time.Sleep(10 * time.Millisecond)
return "winner", nil
}, nil)
event := bus.Emit(abxbus.NewBaseEvent("CompletionEvent", nil))
_, err := event.Now(&abxbus.EventWaitOptions{FirstResult: true})
if err != nil {
panic(err)
}
value, err := event.EventResult(&abxbus.EventResultOptions{
RaiseIfAny: false,
RaiseIfNone: false,
})
if err != nil {
panic(err)
}
if value != "winner" {
panic("expected first winner result")
}
select {
case <-slowStarted:
default:
panic("expected slow handler to start")
}
select {
case <-slowCompleted:
panic("slow handler should not complete before event resolves")
default:
}
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
BaseEvent,
types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode},
};
use futures::executor::block_on;
use futures_timer::Delay;
use serde_json::json;
use std::time::Duration;
event! {
struct CompletionEvent {
event_result_type: serde_json::Value,
}
}
let bus = EventBus::new_with_options(
Some("CompletionFirstBus".to_string()),
EventBusOptions {
event_handler_concurrency: EventHandlerConcurrencyMode::Parallel,
event_handler_completion: EventHandlerCompletionMode::First,
..EventBusOptions::default()
},
);
bus.on(CompletionEvent, |_event: CompletionEvent| async move {
Delay::new(Duration::from_millis(500)).await;
Ok(json!("slow"))
});
bus.on(CompletionEvent, |_event: CompletionEvent| async move {
Delay::new(Duration::from_millis(10)).await;
Ok(json!("winner"))
});
let event = bus.emit(CompletionEvent { ..Default::default() });
let value = block_on(event.event_result())?;
assert_eq!(value, Some(json!("winner")));
Notes
- This mode is useful for fallback chains and race-to-first-response patterns.
- Use
await event.now(first_result=True) / await event.now({ first_result: true }) / event.Now(&abxbus.EventWaitOptions{FirstResult: true}) / event.now_with_options(EventWaitOptions { first_result: true, ..Default::default() }) when you want to resolve as soon as a valid result is available while still allowing all handlers to continue. To stop running after the first valid handler result, set event_handler_completion: 'first' on the event definition.