Skip to main content

Documentation Index

Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt

Use this file to discover all available pages before exploring further.

You can forward events across multiple buses while preserving event path metadata and loop safety. Forwarding does not eagerly hydrate unset event options. Fields such as event_timeout, event_concurrency, event_handler_concurrency, and event_handler_completion stay unset on the event unless the user set them, and each processing bus applies its own defaults at execution time. Repository example files:

Why multiple buses are useful

Multiple buses let you separate concerns and tune runtime behavior per boundary:
  • service-local bus for business logic with strict ordering and useful history
  • transport/relay bus focused on throughput and forwarding (little or no history retention)
  • specialized buses for domains that need different timeout or concurrency policies
This is especially useful in microservice-style designs, where each component has different consistency and observability needs.

Example: service buses with different policies

In this example:
  • AuthBus is strict and debuggable: event_concurrency='bus-serial', event_handler_concurrency='serial', max_history_size=100
  • RelayBus is a transport forwarder: event_concurrency='parallel', max_history_size=0
  • BillingBus is another service bus with its own settings
from abxbus import BaseEvent, EventBus

class UserCreatedEvent(BaseEvent[str]):
    user_id: str

class AuthService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'AuthBus',
            event_concurrency='bus-serial',
            event_handler_concurrency='serial',
            max_history_size=100,
        )
        self.bus.on(UserCreatedEvent, self.on_user_created)

    async def on_user_created(self, event: UserCreatedEvent) -> str:
        return f'auth-ok:{event.user_id}'

class RelayService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'RelayBus',
            event_concurrency='parallel',
            max_history_size=0,
        )

class BillingService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'BillingBus',
            event_concurrency='bus-serial',
            event_handler_concurrency='serial',
            max_history_size=100,
        )
        self.bus.on(UserCreatedEvent, self.on_user_created)

    async def on_user_created(self, event: UserCreatedEvent) -> str:
        return f'billing-ok:{event.user_id}'

auth = AuthService()
relay = RelayService()
billing = BillingService()

auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)

event = await auth.bus.emit(UserCreatedEvent(user_id='u-a8d1')).now(first_result=True)
result = await event.event_result()
print(result)
# 'auth-ok:u-a8d1'

root = auth.bus.emit(UserCreatedEvent(user_id='u-a8d1'))
await root.now()
print(root.event_path)
# ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']

Uni-directional and bi-directional forwarding

Forwarding can be one-way or two-way depending on your topology.
  • Uni-directional: one producer bus forwards to one consumer bus.
  • Bi-directional: both buses forward to each other (common for peer sync).
left = EventBus('LeftBus')
right = EventBus('RightBus')

# uni-directional
left.on('*', right.emit)

# bi-directional (add reverse path)
right.on('*', left.emit)
Loop prevention still applies in both modes: if an event already visited a bus (tracked in event_path), forwarding back to that bus is a no-op and it is not re-processed there.

How loop prevention works (event_path)

Loop prevention is automatic and based on event_path:
  1. Each bus appends its own label (for example AuthBus#a8d1) to event_path when it first sees an event.
  2. When a forwarding handler points to another bus, that bus checks whether its label is already in event_path.
  3. If yes, forwarding to that bus is skipped (no-op), so cycles terminate naturally.
This means you can wire cyclic topologies without infinite forwarding loops.
from abxbus import BaseEvent, EventBus

class PingEvent(BaseEvent):
    message: str

bus_a = EventBus('BusA')
bus_b = EventBus('BusB')
bus_c = EventBus('BusC')

# cycle: A -> B -> C -> A
bus_a.on('*', bus_b.emit)
bus_b.on('*', bus_c.emit)
bus_c.on('*', bus_a.emit)

event = bus_a.emit(PingEvent(message='hello'))
await event.now()

print(event.event_path)
# ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']

Parent-child tracking across forwarded flows

Parent-child tracking also works across forwarded flows:
  • if a forwarded event is handled on a downstream bus and that handler emits a child with event.emit(...), the child still links back to the parent via event_parent_id
  • nested descendants emitted on downstream buses keep that lineage as they continue through forwarding
  • this remains true for both queue-jumped children (await child.now()) and normally queued children (emitted but not immediately awaited)
  • bus.emit(...) inside a handler remains detached top-level work with no parent link
See Parent-Child Tracking for a deeper walkthrough and tree-log example. See Immediate Execution (RPC-style) for queue-jump execution behavior.

Bridges are forwarding with transport

Bridges are fundamentally the same forwarding pattern, but with serialization + remote transport in the middle. See Bridges Overview for HTTP/Redis/NATS/Postgres/socket/file-backed bridge options and setup patterns.