Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
You can forward events across multiple buses while preserving event path metadata and loop safety.
Repository example files:
Why multiple buses are useful
Multiple buses let you separate concerns and tune runtime behavior per boundary:
- service-local bus for business logic with strict ordering and useful history
- transport/relay bus focused on throughput and forwarding (little or no history retention)
- specialized buses for domains that need different timeout or concurrency policies
This is especially useful in microservice-style designs, where each component has different consistency and observability needs.
Example: service buses with different policies
In this example:
AuthBus is strict and debuggable: event_concurrency='bus-serial', event_handler_concurrency='serial', max_history_size=100
RelayBus is a transport forwarder: event_concurrency='parallel', max_history_size=0
BillingBus is another service bus with its own settings
from abxbus import BaseEvent, EventBus
class UserCreatedEvent(BaseEvent[str]):
user_id: str
class AuthService:
def __init__(self) -> None:
self.bus = EventBus(
'AuthBus',
event_concurrency='bus-serial',
event_handler_concurrency='serial',
max_history_size=100,
)
self.bus.on(UserCreatedEvent, self.on_user_created)
async def on_user_created(self, event: UserCreatedEvent) -> str:
return f'auth-ok:{event.user_id}'
class RelayService:
def __init__(self) -> None:
self.bus = EventBus(
'RelayBus',
event_concurrency='parallel',
max_history_size=0,
)
class BillingService:
def __init__(self) -> None:
self.bus = EventBus(
'BillingBus',
event_concurrency='bus-serial',
event_handler_concurrency='serial',
max_history_size=100,
)
self.bus.on(UserCreatedEvent, self.on_user_created)
async def on_user_created(self, event: UserCreatedEvent) -> str:
return f'billing-ok:{event.user_id}'
auth = AuthService()
relay = RelayService()
billing = BillingService()
auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)
result = await auth.bus.emit(UserCreatedEvent(user_id='u-a8d1')).event_result()
print(result)
# 'auth-ok:u-a8d1'
root = auth.bus.emit(UserCreatedEvent(user_id='u-a8d1'))
await root
print(root.event_path)
# ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const UserCreatedEvent = BaseEvent.extend('UserCreatedEvent', {
user_id: z.string(),
event_result_type: z.string(),
})
class AuthService {
bus = new EventBus('AuthBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
max_history_size: 100,
})
constructor() {
this.bus.on(UserCreatedEvent, this.onUserCreated)
}
onUserCreated = async (event: InstanceType<typeof UserCreatedEvent>) => `auth-ok:${event.user_id}`
}
class RelayService {
bus = new EventBus('RelayBus', {
event_concurrency: 'parallel',
max_history_size: 0,
})
}
class BillingService {
bus = new EventBus('BillingBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
max_history_size: 100,
})
constructor() {
this.bus.on(UserCreatedEvent, this.onUserCreated)
}
onUserCreated = async (event: InstanceType<typeof UserCreatedEvent>) => `billing-ok:${event.user_id}`
}
const auth = new AuthService()
const relay = new RelayService()
const billing = new BillingService()
auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)
const event = auth.bus.emit(UserCreatedEvent({ user_id: 'u-a8d1' }))
await event.done()
console.log(event.event_result)
// 'auth-ok:u-a8d1'
console.log(event.event_path)
// ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']
Uni-directional and bi-directional forwarding
Forwarding can be one-way or two-way depending on your topology.
- Uni-directional: one producer bus forwards to one consumer bus.
- Bi-directional: both buses forward to each other (common for peer sync).
left = EventBus('LeftBus')
right = EventBus('RightBus')
# uni-directional
left.on('*', right.emit)
# bi-directional (add reverse path)
right.on('*', left.emit)
const left = new EventBus('LeftBus')
const right = new EventBus('RightBus')
// uni-directional
left.on('*', right.emit)
// bi-directional (add reverse path)
right.on('*', left.emit)
Loop prevention still applies in both modes: if an event already visited a bus (tracked in event_path), forwarding back to that bus is a no-op and it is not re-processed there.
How loop prevention works (event_path)
Loop prevention is automatic and based on event_path:
- Each bus appends its own label (for example
AuthBus#a8d1) to event_path when it first sees an event.
- When a forwarding handler points to another bus, that bus checks whether its label is already in
event_path.
- If yes, forwarding to that bus is skipped (no-op), so cycles terminate naturally.
This means you can wire cyclic topologies without infinite forwarding loops.
from abxbus import BaseEvent, EventBus
class PingEvent(BaseEvent):
message: str
bus_a = EventBus('BusA')
bus_b = EventBus('BusB')
bus_c = EventBus('BusC')
# cycle: A -> B -> C -> A
bus_a.on('*', bus_b.emit)
bus_b.on('*', bus_c.emit)
bus_c.on('*', bus_a.emit)
event = bus_a.emit(PingEvent(message='hello'))
await event
print(event.event_path)
# ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const PingEvent = BaseEvent.extend('PingEvent', {
message: z.string(),
})
const busA = new EventBus('BusA')
const busB = new EventBus('BusB')
const busC = new EventBus('BusC')
// cycle: A -> B -> C -> A
busA.on('*', busB.emit)
busB.on('*', busC.emit)
busC.on('*', busA.emit)
const event = busA.emit(PingEvent({ message: 'hello' }))
await event.done()
console.log(event.event_path)
// ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']
Parent-child tracking across forwarded flows
Parent-child tracking also works across forwarded flows:
- if a forwarded event is handled on a downstream bus and that handler emits a child with
event.emit(...), the child still links back to the parent via event_parent_id
- nested descendants emitted on downstream buses keep that lineage as they continue through forwarding
- this remains true for both queue-jumped children (
await child) and normally queued children (emitted but not immediately awaited)
bus.emit(...) inside a handler remains detached top-level work with no parent link
See Parent-Child Tracking for a deeper walkthrough and tree-log example.
See Immediate Execution (RPC-style) for queue-jump execution behavior.
Bridges are forwarding with transport
Bridges are fundamentally the same forwarding pattern, but with serialization + remote transport in the middle.
See Bridges Overview for HTTP/Redis/NATS/Postgres/socket/file-backed bridge options and setup patterns.