Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
Immediate execution lets a handler emit a child event and await it like a direct async function call.
When this happens inside a handler, the child event is processed immediately (queue-jump) instead of waiting behind unrelated queued events.
Repository example files:
Core pattern
Python
TypeScript
Rust
Go
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent[str]):
pass
class ChildEvent(BaseEvent[str]):
pass
bus = EventBus('RpcBus')
async def on_parent(event: ParentEvent) -> str:
child = event.emit(ChildEvent())
await child.now() # queue-jump while still inside this handler
value = await child.event_result()
return f'parent got: {value}'
async def on_child(_: ChildEvent) -> str:
return 'child response'
bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const ParentEvent = BaseEvent.extend('ParentEvent', { event_result_type: z.string() })
const ChildEvent = BaseEvent.extend('ChildEvent', { event_result_type: z.string() })
const bus = new EventBus('RpcBus')
bus.on(ParentEvent, async (event) => {
const child = event.emit(ChildEvent({}))
await child.now() // queue-jump while still inside this handler
return `parent got: ${await child.eventResult()}`
})
bus.on(ChildEvent, async () => 'child response')
use abxbus_rust::{event, event_bus::EventBus};
use serde_json::json;
event! {
struct ParentEvent {
event_result_type: serde_json::Value,
}
}
event! {
struct ChildEvent {
event_result_type: serde_json::Value,
}
}
let bus = EventBus::new(Some("RpcBus".to_string()));
bus.on(ParentEvent, |event: ParentEvent| async move {
let child = event.emit(ChildEvent { ..Default::default() });
child.now().await?; // queue-jump while still inside this handler
let value = child.event_result().await?;
Ok(json!(format!("parent got: {value}")))
});
bus.on(ChildEvent, |_event: ChildEvent| async move {
Ok(json!("child response"))
});
bus := abxbus.NewEventBus("RpcBus", nil)
bus.On("ParentEvent", "on_parent", func(event *abxbus.BaseEvent) (any, error) {
child := event.Emit(abxbus.NewBaseEvent("ChildEvent", nil))
if _, err := child.Now(); err != nil {
return nil, err
}
value, err := child.EventResult()
if err != nil {
return nil, err
}
return fmt.Sprintf("parent got: %v", value), nil
}, nil)
bus.On("ChildEvent", "on_child", func(event *abxbus.BaseEvent) (any, error) {
return "child response", nil
}, nil)
Parallel fan-out inside a handler
If the parent bus/event uses event_concurrency='parallel', you can queue-jump multiple child calls at once and wait for them as a group.
Python
TypeScript
Go
Rust
import asyncio
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent[None]):
pass
class SomeChildEvent1(BaseEvent[str]):
pass
class SomeChildEvent2(BaseEvent[str]):
pass
class SomeChildEvent3(BaseEvent[str]):
pass
bus = EventBus('ParallelRpcBus', event_concurrency='parallel')
async def on_parent(event: ParentEvent) -> None:
settled = await asyncio.gather(
event.emit(SomeChildEvent1()),
event.emit(SomeChildEvent2()),
event.emit(SomeChildEvent3()),
return_exceptions=True,
)
for item in settled:
if isinstance(item, Exception):
print(f'child failed: {item}')
else:
print(f'child completed: {item.event_type}')
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const ParentEvent = BaseEvent.extend('ParentEvent', {})
const SomeChildEvent1 = BaseEvent.extend('SomeChildEvent1', { event_result_type: z.string() })
const SomeChildEvent2 = BaseEvent.extend('SomeChildEvent2', { event_result_type: z.string() })
const SomeChildEvent3 = BaseEvent.extend('SomeChildEvent3', { event_result_type: z.string() })
const bus = new EventBus('ParallelRpcBus', { event_concurrency: 'parallel' })
bus.on(ParentEvent, async (event) => {
const settled = await Promise.allSettled([
event.emit(SomeChildEvent1({})).now(),
event.emit(SomeChildEvent2({})).now(),
event.emit(SomeChildEvent3({})).now(),
])
for (const result of settled) {
if (result.status === 'rejected') {
console.error('child failed:', result.reason)
} else {
console.log('child completed:', result.value.event_type)
}
}
})
bus := abxbus.NewEventBus("ParallelRpcBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyParallel,
})
bus.On("ParentEvent", "on_parent", func(event *abxbus.BaseEvent) (any, error) {
children := []*abxbus.BaseEvent{
event.Emit(abxbus.NewBaseEvent("SomeChildEvent1", nil)),
event.Emit(abxbus.NewBaseEvent("SomeChildEvent2", nil)),
event.Emit(abxbus.NewBaseEvent("SomeChildEvent3", nil)),
}
for _, child := range children {
if _, err := child.Now(); err != nil {
fmt.Printf("child failed: %v\n", err)
continue
}
fmt.Printf("child completed: %s\n", child.EventType)
}
return nil, nil
}, nil)
use abxbus_rust::{event, event_bus::{EventBus, EventBusOptions}, types::EventConcurrencyMode};
use futures::future::join_all;
use serde_json::json;
event! { struct ParentEvent { event_result_type: serde_json::Value } }
event! { struct SomeChildEvent1 { event_result_type: serde_json::Value } }
event! { struct SomeChildEvent2 { event_result_type: serde_json::Value } }
event! { struct SomeChildEvent3 { event_result_type: serde_json::Value } }
let bus = EventBus::new_with_options(
Some("ParallelRpcBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::Parallel,
..EventBusOptions::default()
},
);
bus.on(ParentEvent, move |event: ParentEvent| {
async move {
let children = vec![
event.emit(SomeChildEvent1 { ..Default::default() }),
event.emit(SomeChildEvent2 { ..Default::default() }),
event.emit(SomeChildEvent3 { ..Default::default() }),
];
for result in join_all(children.iter().map(|child| child.now())).await {
match result {
Ok(child) => println!("child completed: {}", child.event_type),
Err(err) => println!("child failed: {err}"),
}
}
Ok(json!(null))
}
});
Python note: asyncio.gather(..., return_exceptions=True) is the closest Promise.allSettled(...) equivalent here. In Rust, call now().await on each emitted child to take the same immediate queue-jump path. In Go, call Now() on each emitted child.
Execution order example
In this pattern, sibling work can already be queued, but the awaited child still runs first.
Python
TypeScript
Go
Rust
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent):
pass
class ChildEvent(BaseEvent):
pass
class SiblingEvent(BaseEvent):
pass
bus = EventBus('OrderBus', event_concurrency='bus-serial', event_handler_concurrency='serial')
order: list[str] = []
async def on_parent(event: ParentEvent) -> None:
order.append('parent_start')
# Detached top-level work: no parent link is recorded for the sibling.
bus.emit(SiblingEvent())
child = event.emit(ChildEvent())
await child.now()
order.append('parent_end')
async def on_child(_: ChildEvent) -> None:
order.append('child')
async def on_sibling(_: SiblingEvent) -> None:
order.append('sibling')
bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)
bus.on(SiblingEvent, on_sibling)
await bus.emit(ParentEvent()).now()
await bus.wait_until_idle()
assert order.index('child') < order.index('parent_end')
assert order.index('parent_end') < order.index('sibling')
import { BaseEvent, EventBus } from 'abxbus'
const ParentEvent = BaseEvent.extend('ParentEvent', {})
const ChildEvent = BaseEvent.extend('ChildEvent', {})
const SiblingEvent = BaseEvent.extend('SiblingEvent', {})
const bus = new EventBus('OrderBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
})
const order: string[] = []
bus.on(ParentEvent, async (event) => {
order.push('parent_start')
// Detached top-level work: no parent link is recorded for the sibling.
bus.emit(SiblingEvent({}))
const child = event.emit(ChildEvent({}))
await child.now()
order.push('parent_end')
})
bus.on(ChildEvent, async () => {
order.push('child')
})
bus.on(SiblingEvent, async () => {
order.push('sibling')
})
await bus.emit(ParentEvent({})).now()
await bus.waitUntilIdle()
if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes')
if (!(order.indexOf('parent_end') < order.indexOf('sibling'))) throw new Error('sibling should run after parent ends')
bus := abxbus.NewEventBus("OrderBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyBusSerial,
EventHandlerConcurrency: abxbus.EventHandlerConcurrencySerial,
})
order := []string{}
bus.On("ParentEvent", "on_parent", func(event *abxbus.BaseEvent) (any, error) {
order = append(order, "parent_start")
// Detached top-level work: no parent link is recorded for the sibling.
bus.Emit(abxbus.NewBaseEvent("SiblingEvent", nil))
child := event.Emit(abxbus.NewBaseEvent("ChildEvent", nil))
if _, err := child.Now(); err != nil {
return nil, err
}
order = append(order, "parent_end")
return nil, nil
}, nil)
bus.On("ChildEvent", "on_child", func(event *abxbus.BaseEvent) (any, error) {
order = append(order, "child")
return nil, nil
}, nil)
bus.On("SiblingEvent", "on_sibling", func(event *abxbus.BaseEvent) (any, error) {
order = append(order, "sibling")
return nil, nil
}, nil)
parent := bus.Emit(abxbus.NewBaseEvent("ParentEvent", nil))
if _, err := parent.Now(); err != nil {
panic(err)
}
if ok := bus.WaitUntilIdle(nil); !ok {
panic("bus did not become idle")
}
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
types::{EventConcurrencyMode, EventHandlerConcurrencyMode},
};
use futures::executor::block_on;
use std::sync::{Arc, Mutex};
event! { struct ParentEvent { event_result_type: () } }
event! { struct ChildEvent { event_result_type: () } }
event! { struct SiblingEvent { event_result_type: () } }
let bus = EventBus::new_with_options(
Some("OrderBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::BusSerial,
event_handler_concurrency: EventHandlerConcurrencyMode::Serial,
..EventBusOptions::default()
},
);
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let order_for_parent = order.clone();
let bus_for_parent = bus.clone();
bus.on(ParentEvent, move |event: ParentEvent| {
let order = order_for_parent.clone();
let bus = bus_for_parent.clone();
async move {
order.lock().unwrap().push("parent_start".to_string());
// Detached top-level work: no parent link is recorded for the sibling.
bus.emit(SiblingEvent { ..Default::default() });
let child = event.emit(ChildEvent { ..Default::default() });
child.now().await?;
order.lock().unwrap().push("parent_end".to_string());
Ok(())
}
});
let order_for_child = order.clone();
bus.on(ChildEvent, move |_event: ChildEvent| {
let order = order_for_child.clone();
async move {
order.lock().unwrap().push("child".to_string());
Ok(())
}
});
let order_for_sibling = order.clone();
bus.on(SiblingEvent, move |_event: SiblingEvent| {
let order = order_for_sibling.clone();
async move {
order.lock().unwrap().push("sibling".to_string());
Ok(())
}
});
block_on(bus.emit(ParentEvent { ..Default::default() }).now())?;
assert!(block_on(bus.wait_until_idle(None)));
let order = order.lock().unwrap();
assert!(order.iter().position(|item| item == "child").unwrap() < order.iter().position(|item| item == "parent_end").unwrap());
assert!(order.iter().position(|item| item == "parent_end").unwrap() < order.iter().position(|item| item == "sibling").unwrap());
Interaction with concurrency modes
event_concurrency = global-serial: queue-jump still works, but all buses still share one global event slot.
event_concurrency = bus-serial: queue-jump preempts that bus queue; other buses can continue processing independently.
event_concurrency = parallel: events may already overlap; queue-jump still reduces parent latency for awaited child calls.
event_handler_concurrency = serial: parent temporarily yields execution so child handlers can run without deadlock.
event_handler_concurrency = parallel: child handlers can overlap with other handlers for the same event.
event_handler_completion = first: winner semantics can cancel loser handlers and their in-flight child work.
Notes
- In Python,
await child_event inside a handler is the immediate path.
- In Python,
await child_event.wait() keeps normal queue order (non-queue-jump wait).
- In TypeScript, use
await child_event.now().
- In TypeScript,
await child_event.wait() keeps normal queue order (non-queue-jump wait).
- In Go, use
childEvent.Now() for the immediate path.
- In Go, use
childEvent.Wait() to keep normal queue order.
- In Rust, use
child_event.now().await for the immediate path.
- In Rust, use
child_event.wait().await to keep normal queue order.
Related pages