Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
Immediate execution lets a handler emit a child event and await it like a direct async function call.
When this happens inside a handler, the child event is processed immediately (queue-jump) instead of waiting behind unrelated queued events.
Repository example files:
Core pattern
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent[str]):
pass
class ChildEvent(BaseEvent[str]):
pass
bus = EventBus('RpcBus')
async def on_parent(event: ParentEvent) -> str:
child = event.emit(ChildEvent())
await child # queue-jump while still inside this handler
value = await child.event_result()
return f'parent got: {value}'
async def on_child(_: ChildEvent) -> str:
return 'child response'
bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const ParentEvent = BaseEvent.extend('ParentEvent', { event_result_type: z.string() })
const ChildEvent = BaseEvent.extend('ChildEvent', { event_result_type: z.string() })
const bus = new EventBus('RpcBus')
bus.on(ParentEvent, async (event) => {
const child = event.emit(ChildEvent({}))
await child.done() // queue-jump while still inside this handler
return `parent got: ${child.event_result}`
})
bus.on(ChildEvent, async () => 'child response')
Parallel fan-out inside a handler
If the parent bus/event uses event_concurrency='parallel', you can queue-jump multiple child calls at once and wait for them as a group.
import asyncio
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent[None]):
pass
class SomeChildEvent1(BaseEvent[str]):
pass
class SomeChildEvent2(BaseEvent[str]):
pass
class SomeChildEvent3(BaseEvent[str]):
pass
bus = EventBus('ParallelRpcBus', event_concurrency='parallel')
async def on_parent(event: ParentEvent) -> None:
settled = await asyncio.gather(
event.emit(SomeChildEvent1()),
event.emit(SomeChildEvent2()),
event.emit(SomeChildEvent3()),
return_exceptions=True,
)
for item in settled:
if isinstance(item, Exception):
print(f'child failed: {item}')
else:
print(f'child completed: {item.event_type}')
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const ParentEvent = BaseEvent.extend('ParentEvent', {})
const SomeChildEvent1 = BaseEvent.extend('SomeChildEvent1', { event_result_type: z.string() })
const SomeChildEvent2 = BaseEvent.extend('SomeChildEvent2', { event_result_type: z.string() })
const SomeChildEvent3 = BaseEvent.extend('SomeChildEvent3', { event_result_type: z.string() })
const bus = new EventBus('ParallelRpcBus', { event_concurrency: 'parallel' })
bus.on(ParentEvent, async (event) => {
const settled = await Promise.allSettled([
event.emit(SomeChildEvent1({})).done(),
event.emit(SomeChildEvent2({})).done(),
event.emit(SomeChildEvent3({})).done(),
])
for (const result of settled) {
if (result.status === 'rejected') {
console.error('child failed:', result.reason)
} else {
console.log('child completed:', result.value.event_type)
}
}
})
Python note: asyncio.gather(..., return_exceptions=True) is the closest Promise.allSettled(...) equivalent here. Each direct await on the emitted child events still uses the immediate queue-jump path.
Execution order example
In this pattern, sibling work can already be queued, but the awaited child still runs first.
from abxbus import BaseEvent, EventBus
class ParentEvent(BaseEvent):
pass
class ChildEvent(BaseEvent):
pass
class SiblingEvent(BaseEvent):
pass
bus = EventBus('OrderBus', event_concurrency='bus-serial', event_handler_concurrency='serial')
order: list[str] = []
async def on_parent(event: ParentEvent) -> None:
order.append('parent_start')
# Detached top-level work: no parent link is recorded for the sibling.
bus.emit(SiblingEvent())
child = event.emit(ChildEvent())
await child
order.append('parent_end')
async def on_child(_: ChildEvent) -> None:
order.append('child')
async def on_sibling(_: SiblingEvent) -> None:
order.append('sibling')
bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)
bus.on(SiblingEvent, on_sibling)
await bus.emit(ParentEvent())
await bus.wait_until_idle()
assert order.index('child') < order.index('parent_end')
assert order.index('parent_end') < order.index('sibling')
import { BaseEvent, EventBus } from 'abxbus'
const ParentEvent = BaseEvent.extend('ParentEvent', {})
const ChildEvent = BaseEvent.extend('ChildEvent', {})
const SiblingEvent = BaseEvent.extend('SiblingEvent', {})
const bus = new EventBus('OrderBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
})
const order: string[] = []
bus.on(ParentEvent, async (event) => {
order.push('parent_start')
// Detached top-level work: no parent link is recorded for the sibling.
bus.emit(SiblingEvent({}))
const child = event.emit(ChildEvent({}))
await child.done()
order.push('parent_end')
})
bus.on(ChildEvent, async () => {
order.push('child')
})
bus.on(SiblingEvent, async () => {
order.push('sibling')
})
await bus.emit(ParentEvent({})).done()
await bus.waitUntilIdle()
if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes')
if (!(order.indexOf('parent_end') < order.indexOf('sibling'))) throw new Error('sibling should run after parent ends')
Interaction with concurrency modes
event_concurrency = global-serial: queue-jump still works, but all buses still share one global event slot.
event_concurrency = bus-serial: queue-jump preempts that bus queue; other buses can continue processing independently.
event_concurrency = parallel: events may already overlap; queue-jump still reduces parent latency for awaited child calls.
event_handler_concurrency = serial: parent temporarily yields execution so child handlers can run without deadlock.
event_handler_concurrency = parallel: child handlers can overlap with other handlers for the same event.
event_handler_completion = first: winner semantics can cancel loser handlers and their in-flight child work.
Notes
- In Python,
await child_event inside a handler is the immediate path.
- In Python,
await child_event.event_completed() keeps normal queue order (non-queue-jump wait).
- In TypeScript, use
await child_event.done().
- In TypeScript,
await child_event.eventCompleted() keeps normal queue order (non-queue-jump wait).
Related pages