Skip to main content

Documentation Index

Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt

Use this file to discover all available pages before exploring further.

Immediate execution lets a handler emit a child event and await it like a direct async function call. When this happens inside a handler, the child event is processed immediately (queue-jump) instead of waiting behind unrelated queued events. Repository example files:

Core pattern

from abxbus import BaseEvent, EventBus

class ParentEvent(BaseEvent[str]):
    pass

class ChildEvent(BaseEvent[str]):
    pass

bus = EventBus('RpcBus')

async def on_parent(event: ParentEvent) -> str:
    child = event.emit(ChildEvent())
    await child.now()  # queue-jump while still inside this handler
    value = await child.event_result()
    return f'parent got: {value}'

async def on_child(_: ChildEvent) -> str:
    return 'child response'

bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)

Parallel fan-out inside a handler

If the parent bus/event uses event_concurrency='parallel', you can queue-jump multiple child calls at once and wait for them as a group.
import asyncio

from abxbus import BaseEvent, EventBus

class ParentEvent(BaseEvent[None]):
    pass

class SomeChildEvent1(BaseEvent[str]):
    pass

class SomeChildEvent2(BaseEvent[str]):
    pass

class SomeChildEvent3(BaseEvent[str]):
    pass

bus = EventBus('ParallelRpcBus', event_concurrency='parallel')

async def on_parent(event: ParentEvent) -> None:
    settled = await asyncio.gather(
        event.emit(SomeChildEvent1()),
        event.emit(SomeChildEvent2()),
        event.emit(SomeChildEvent3()),
        return_exceptions=True,
    )

    for item in settled:
        if isinstance(item, Exception):
            print(f'child failed: {item}')
        else:
            print(f'child completed: {item.event_type}')
Python note: asyncio.gather(..., return_exceptions=True) is the closest Promise.allSettled(...) equivalent here. In Rust, call now().await on each emitted child to take the same immediate queue-jump path. In Go, call Now() on each emitted child.

Execution order example

In this pattern, sibling work can already be queued, but the awaited child still runs first.
from abxbus import BaseEvent, EventBus

class ParentEvent(BaseEvent):
    pass

class ChildEvent(BaseEvent):
    pass

class SiblingEvent(BaseEvent):
    pass

bus = EventBus('OrderBus', event_concurrency='bus-serial', event_handler_concurrency='serial')
order: list[str] = []

async def on_parent(event: ParentEvent) -> None:
    order.append('parent_start')
    # Detached top-level work: no parent link is recorded for the sibling.
    bus.emit(SiblingEvent())
    child = event.emit(ChildEvent())
    await child.now()
    order.append('parent_end')

async def on_child(_: ChildEvent) -> None:
    order.append('child')

async def on_sibling(_: SiblingEvent) -> None:
    order.append('sibling')

bus.on(ParentEvent, on_parent)
bus.on(ChildEvent, on_child)
bus.on(SiblingEvent, on_sibling)

await bus.emit(ParentEvent()).now()
await bus.wait_until_idle()

assert order.index('child') < order.index('parent_end')
assert order.index('parent_end') < order.index('sibling')

Interaction with concurrency modes

  • event_concurrency = global-serial: queue-jump still works, but all buses still share one global event slot.
  • event_concurrency = bus-serial: queue-jump preempts that bus queue; other buses can continue processing independently.
  • event_concurrency = parallel: events may already overlap; queue-jump still reduces parent latency for awaited child calls.
  • event_handler_concurrency = serial: parent temporarily yields execution so child handlers can run without deadlock.
  • event_handler_concurrency = parallel: child handlers can overlap with other handlers for the same event.
  • event_handler_completion = first: winner semantics can cancel loser handlers and their in-flight child work.

Notes

  • In Python, await child_event inside a handler is the immediate path.
  • In Python, await child_event.wait() keeps normal queue order (non-queue-jump wait).
  • In TypeScript, use await child_event.now().
  • In TypeScript, await child_event.wait() keeps normal queue order (non-queue-jump wait).
  • In Go, use childEvent.Now() for the immediate path.
  • In Go, use childEvent.Wait() to keep normal queue order.
  • In Rust, use child_event.now().await for the immediate path.
  • In Rust, use child_event.wait().await to keep normal queue order.