Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
You can forward events across multiple buses while preserving event path metadata and loop safety.
Forwarding does not eagerly hydrate unset event options. Fields such as event_timeout, event_concurrency, event_handler_concurrency, and event_handler_completion stay unset on the event unless the user set them, and each processing bus applies its own defaults at execution time.
Repository example files:
Why multiple buses are useful
Multiple buses let you separate concerns and tune runtime behavior per boundary:
- service-local bus for business logic with strict ordering and useful history
- transport/relay bus focused on throughput and forwarding (little or no history retention)
- specialized buses for domains that need different timeout or concurrency policies
This is especially useful in microservice-style designs, where each component has different consistency and observability needs.
Example: service buses with different policies
In this example:
AuthBus is strict and debuggable: event_concurrency='bus-serial', event_handler_concurrency='serial', max_history_size=100
RelayBus is a transport forwarder: event_concurrency='parallel', max_history_size=0
BillingBus is another service bus with its own settings
Python
TypeScript
Rust
Go
from abxbus import BaseEvent, EventBus
class UserCreatedEvent(BaseEvent[str]):
user_id: str
class AuthService:
def __init__(self) -> None:
self.bus = EventBus(
'AuthBus',
event_concurrency='bus-serial',
event_handler_concurrency='serial',
max_history_size=100,
)
self.bus.on(UserCreatedEvent, self.on_user_created)
async def on_user_created(self, event: UserCreatedEvent) -> str:
return f'auth-ok:{event.user_id}'
class RelayService:
def __init__(self) -> None:
self.bus = EventBus(
'RelayBus',
event_concurrency='parallel',
max_history_size=0,
)
class BillingService:
def __init__(self) -> None:
self.bus = EventBus(
'BillingBus',
event_concurrency='bus-serial',
event_handler_concurrency='serial',
max_history_size=100,
)
self.bus.on(UserCreatedEvent, self.on_user_created)
async def on_user_created(self, event: UserCreatedEvent) -> str:
return f'billing-ok:{event.user_id}'
auth = AuthService()
relay = RelayService()
billing = BillingService()
auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)
event = await auth.bus.emit(UserCreatedEvent(user_id='u-a8d1')).now(first_result=True)
result = await event.event_result()
print(result)
# 'auth-ok:u-a8d1'
root = auth.bus.emit(UserCreatedEvent(user_id='u-a8d1'))
await root.now()
print(root.event_path)
# ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const UserCreatedEvent = BaseEvent.extend('UserCreatedEvent', {
user_id: z.string(),
event_result_type: z.string(),
})
class AuthService {
bus = new EventBus('AuthBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
max_history_size: 100,
})
constructor() {
this.bus.on(UserCreatedEvent, this.onUserCreated)
}
onUserCreated = async (event: InstanceType<typeof UserCreatedEvent>) => `auth-ok:${event.user_id}`
}
class RelayService {
bus = new EventBus('RelayBus', {
event_concurrency: 'parallel',
max_history_size: 0,
})
}
class BillingService {
bus = new EventBus('BillingBus', {
event_concurrency: 'bus-serial',
event_handler_concurrency: 'serial',
max_history_size: 100,
})
constructor() {
this.bus.on(UserCreatedEvent, this.onUserCreated)
}
onUserCreated = async (event: InstanceType<typeof UserCreatedEvent>) => `billing-ok:${event.user_id}`
}
const auth = new AuthService()
const relay = new RelayService()
const billing = new BillingService()
auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)
const event = auth.bus.emit(UserCreatedEvent({ user_id: 'u-a8d1' }))
await event.now()
console.log(await event.eventResult())
// 'auth-ok:u-a8d1'
console.log(event.event_path)
// ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']
use abxbus_rust::{
event,
event_bus::{EventBus, EventBusOptions},
types::{EventConcurrencyMode, EventHandlerConcurrencyMode},
};
use futures::executor::block_on;
use serde_json::json;
event! {
struct UserCreatedEvent {
user_id: String,
event_result_type: serde_json::Value,
}
}
let auth = EventBus::new_with_options(
Some("AuthBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::BusSerial,
event_handler_concurrency: EventHandlerConcurrencyMode::Serial,
max_history_size: Some(100),
..EventBusOptions::default()
},
);
let relay = EventBus::new_with_options(
Some("RelayBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::Parallel,
max_history_size: Some(0),
..EventBusOptions::default()
},
);
let billing = EventBus::new_with_options(
Some("BillingBus".to_string()),
EventBusOptions {
event_concurrency: EventConcurrencyMode::BusSerial,
event_handler_concurrency: EventHandlerConcurrencyMode::Serial,
max_history_size: Some(100),
..EventBusOptions::default()
},
);
auth.on(UserCreatedEvent, |event: UserCreatedEvent| async move {
Ok(json!(format!("auth-ok:{}", event.user_id)))
});
billing.on(UserCreatedEvent, |event: UserCreatedEvent| async move {
Ok(json!(format!("billing-ok:{}", event.user_id)))
});
let relay_for_handler = relay.clone();
auth.on(UserCreatedEvent, move |event: UserCreatedEvent| {
let relay_for_handler = relay_for_handler.clone();
async move {
Ok(relay_for_handler.emit(event).to_json_value())
}
});
let billing_for_handler = billing.clone();
relay.on(UserCreatedEvent, move |event: UserCreatedEvent| {
let billing_for_handler = billing_for_handler.clone();
async move {
Ok(billing_for_handler.emit(event).to_json_value())
}
});
let event = auth.emit(UserCreatedEvent {
user_id: "u-a8d1".to_string(),
..Default::default()
});
let result = block_on(event.event_result())?;
block_on(event.now());
println!("{result}");
println!("{:?}", event.event_path.read());
package main
import (
"fmt"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go/v2"
)
type AuthService struct {
bus *abxbus.EventBus
}
type RelayService struct {
bus *abxbus.EventBus
}
type BillingService struct {
bus *abxbus.EventBus
}
func main() {
maxHistory := 100
zeroHistory := 0
auth := &AuthService{bus: abxbus.NewEventBus("AuthBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyBusSerial,
EventHandlerConcurrency: abxbus.EventHandlerConcurrencySerial,
MaxHistorySize: &maxHistory,
})}
relay := &RelayService{bus: abxbus.NewEventBus("RelayBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyParallel,
MaxHistorySize: &zeroHistory,
})}
billing := &BillingService{bus: abxbus.NewEventBus("BillingBus", &abxbus.EventBusOptions{
EventConcurrency: abxbus.EventConcurrencyBusSerial,
EventHandlerConcurrency: abxbus.EventHandlerConcurrencySerial,
MaxHistorySize: &maxHistory,
})}
auth.bus.On("UserCreatedEvent", "on_user_created", func(event *abxbus.BaseEvent) (any, error) {
return fmt.Sprintf("auth-ok:%s", event.Payload["user_id"]), nil
}, nil)
billing.bus.On("UserCreatedEvent", "on_user_created", func(event *abxbus.BaseEvent) (any, error) {
return fmt.Sprintf("billing-ok:%s", event.Payload["user_id"]), nil
}, nil)
auth.bus.On("*", "forward_to_relay", func(event *abxbus.BaseEvent) (any, error) {
return relay.bus.Emit(event), nil
}, nil)
relay.bus.On("*", "forward_to_billing", func(event *abxbus.BaseEvent) (any, error) {
return billing.bus.Emit(event), nil
}, nil)
event := auth.bus.Emit(abxbus.NewBaseEvent("UserCreatedEvent", map[string]any{"user_id": "u-a8d1"}))
result, err := event.EventResult()
if err != nil {
panic(err)
}
fmt.Println(result)
// auth-ok:u-a8d1
fmt.Println(event.EventPath)
// [AuthBus#a8d1 RelayBus#3f2c BillingBus#b91e]
}
Uni-directional and bi-directional forwarding
Forwarding can be one-way or two-way depending on your topology.
- Uni-directional: one producer bus forwards to one consumer bus.
- Bi-directional: both buses forward to each other (common for peer sync).
Python
TypeScript
Rust
Go
left = EventBus('LeftBus')
right = EventBus('RightBus')
# uni-directional
left.on('*', right.emit)
# bi-directional (add reverse path)
right.on('*', left.emit)
const left = new EventBus('LeftBus')
const right = new EventBus('RightBus')
// uni-directional
left.on('*', right.emit)
// bi-directional (add reverse path)
right.on('*', left.emit)
use abxbus_rust::{event, event_bus::EventBus};
event! {
struct PingEvent {
event_result_type: serde_json::Value,
}
}
let left = EventBus::new(Some("LeftBus".to_string()));
let right = EventBus::new(Some("RightBus".to_string()));
let right_for_handler = right.clone();
left.on(PingEvent, move |event: PingEvent| {
let right_for_handler = right_for_handler.clone();
async move {
Ok(right_for_handler.emit(event).to_json_value())
}
});
let left_for_handler = left.clone();
right.on(PingEvent, move |event: PingEvent| {
let left_for_handler = left_for_handler.clone();
async move {
Ok(left_for_handler.emit(event).to_json_value())
}
});
left := abxbus.NewEventBus("LeftBus", nil)
right := abxbus.NewEventBus("RightBus", nil)
// uni-directional
left.On("*", "forward_to_right", func(event *abxbus.BaseEvent) (any, error) {
return right.Emit(event), nil
}, nil)
// bi-directional (add reverse path)
right.On("*", "forward_to_left", func(event *abxbus.BaseEvent) (any, error) {
return left.Emit(event), nil
}, nil)
Loop prevention still applies in both modes: if an event already visited a bus (tracked in event_path), forwarding back to that bus is a no-op and it is not re-processed there.
How loop prevention works (event_path)
Loop prevention is automatic and based on event_path:
- Each bus appends its own label (for example
AuthBus#a8d1) to event_path when it first sees an event.
- When a forwarding handler points to another bus, that bus checks whether its label is already in
event_path.
- If yes, forwarding to that bus is skipped (no-op), so cycles terminate naturally.
This means you can wire cyclic topologies without infinite forwarding loops.
Python
TypeScript
Rust
Go
from abxbus import BaseEvent, EventBus
class PingEvent(BaseEvent):
message: str
bus_a = EventBus('BusA')
bus_b = EventBus('BusB')
bus_c = EventBus('BusC')
# cycle: A -> B -> C -> A
bus_a.on('*', bus_b.emit)
bus_b.on('*', bus_c.emit)
bus_c.on('*', bus_a.emit)
event = bus_a.emit(PingEvent(message='hello'))
await event.now()
print(event.event_path)
# ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const PingEvent = BaseEvent.extend('PingEvent', {
message: z.string(),
})
const busA = new EventBus('BusA')
const busB = new EventBus('BusB')
const busC = new EventBus('BusC')
// cycle: A -> B -> C -> A
busA.on('*', busB.emit)
busB.on('*', busC.emit)
busC.on('*', busA.emit)
const event = busA.emit(PingEvent({ message: 'hello' }))
await event.now()
console.log(event.event_path)
// ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']
use abxbus_rust::{event, event_bus::EventBus};
use futures::executor::block_on;
event! {
struct PingEvent {
event_result_type: serde_json::Value,
}
}
let bus_a = EventBus::new(Some("BusA".to_string()));
let bus_b = EventBus::new(Some("BusB".to_string()));
let bus_c = EventBus::new(Some("BusC".to_string()));
let bus_b_for_handler = bus_b.clone();
bus_a.on(PingEvent, move |event: PingEvent| {
let bus_b_for_handler = bus_b_for_handler.clone();
async move {
Ok(bus_b_for_handler.emit(event).to_json_value())
}
});
let bus_c_for_handler = bus_c.clone();
bus_b.on(PingEvent, move |event: PingEvent| {
let bus_c_for_handler = bus_c_for_handler.clone();
async move {
Ok(bus_c_for_handler.emit(event).to_json_value())
}
});
let bus_a_for_handler = bus_a.clone();
bus_c.on(PingEvent, move |event: PingEvent| {
let bus_a_for_handler = bus_a_for_handler.clone();
async move {
Ok(bus_a_for_handler.emit(event).to_json_value())
}
});
let event = bus_a.emit(PingEvent { ..Default::default() });
block_on(event.now());
println!("{:?}", event.event_path.read());
busA := abxbus.NewEventBus("BusA", nil)
busB := abxbus.NewEventBus("BusB", nil)
busC := abxbus.NewEventBus("BusC", nil)
// cycle: A -> B -> C -> A
busA.On("*", "forward_to_b", func(event *abxbus.BaseEvent) (any, error) {
return busB.Emit(event), nil
}, nil)
busB.On("*", "forward_to_c", func(event *abxbus.BaseEvent) (any, error) {
return busC.Emit(event), nil
}, nil)
busC.On("*", "forward_to_a", func(event *abxbus.BaseEvent) (any, error) {
return busA.Emit(event), nil
}, nil)
event := busA.Emit(abxbus.NewBaseEvent("PingEvent", map[string]any{"message": "hello"}))
if _, err := event.Now(); err != nil {
panic(err)
}
fmt.Println(event.EventPath)
// [BusA#a8d1 BusB#3f2c BusC#b91e]
Parent-child tracking across forwarded flows
Parent-child tracking also works across forwarded flows:
- if a forwarded event is handled on a downstream bus and that handler emits a child with
event.emit(...), the child still links back to the parent via event_parent_id
- nested descendants emitted on downstream buses keep that lineage as they continue through forwarding
- this remains true for both queue-jumped children (
await child.now()) and normally queued children (emitted but not immediately awaited)
bus.emit(...) inside a handler remains detached top-level work with no parent link
See Parent-Child Tracking for a deeper walkthrough and tree-log example.
See Immediate Execution (RPC-style) for queue-jump execution behavior.
Bridges are forwarding with transport
Bridges are fundamentally the same forwarding pattern, but with serialization + remote transport in the middle.
See Bridges Overview for HTTP/Redis/NATS/Postgres/socket/file-backed bridge options and setup patterns.