Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
If you like events in theory but hate the day-to-day developer experience, this page is for you.
Common pain points:
- calling boilerplate (
emit + await completion + unwrap result) for every request
- eventual consistency anxiety (“will my response event arrive?”)
- duplicating signatures across schemas, handlers, and implementation functions
The goal here is to keep event architecture benefits without forcing painful calling patterns.
1) Pain: painful calling interface boilerplate
You usually end up writing verbose call sites repeatedly.
events_suck.wrap(...) gives you a method-shaped client API (client.create(...)) while still routing through events.
Python
TypeScript
Go
Rust
# Without wrap: valid, but noisy at every call site
event = bus.emit(CreateUserEvent(name='bob', age=45))
user_id = await event.event_result()
all_values = await event.event_results_list(raise_if_any=False, raise_if_none=False)
# With wrap: looks like normal async function calls
SDKClient = events_suck.wrap('SDKClient', {'create': CreateUserEvent, 'update': UpdateUserEvent})
client = SDKClient(bus=bus)
user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')
// Without wrap: valid, but noisy at every call site
const event = bus.emit(CreateUserEvent({ name: 'bob', age: 45 }))
await event.now()
const user_id = await event.eventResult()
const all_values = await event.eventResultsList({ raise_if_any: false, raise_if_none: false })
// With wrap: looks like normal async function calls
const SDKClient = events_suck.wrap('SDKClient', { create: CreateUserEvent, update: UpdateUserEvent })
const client = new SDKClient(bus)
const id = await client.create({ name: 'bob', age: 45 }, { nickname: 'bobby' })
const updated = await client.update({ id: id ?? 'fallback-id', age: 46 }, { source: 'sync' })
// events_suck is not implemented in abxbus-go yet.
//
// Use the core EventBus APIs directly:
event := bus.Emit(abxbus.NewBaseEvent("CreateUserEvent", map[string]any{
"name": "bob",
"age": 45,
}))
userID, err := event.EventResult()
// Not implemented in abxbus-rust yet.
// Use typed event JSON serialization with an external transport for now.
Minimal end-to-end wrap(...) wiring
Python
TypeScript
Go
Rust
from abxbus import BaseEvent, EventBus, events_suck
class CreateUserEvent(BaseEvent[str]):
name: str
age: int
class UpdateUserEvent(BaseEvent[bool]):
id: str
age: int | None = None
class UserService:
def __init__(self) -> None:
self.users: dict[str, dict[str, int | str]] = {}
async def on_create(self, event: CreateUserEvent) -> str:
user_id = f'user-{event.age}'
self.users[user_id] = {'id': user_id, 'name': event.name, 'age': event.age}
return user_id
async def on_update(self, event: UpdateUserEvent) -> bool:
if event.id not in self.users:
return False
if event.age is not None:
self.users[event.id]['age'] = event.age
return True
bus = EventBus('SDKBus')
service = UserService()
bus.on(CreateUserEvent, service.on_create)
bus.on(UpdateUserEvent, service.on_update)
SDKClient = events_suck.wrap('SDKClient', {
'create': CreateUserEvent,
'update': UpdateUserEvent,
})
client = SDKClient(bus=bus)
user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')
import { BaseEvent, EventBus, events_suck } from 'abxbus'
import { z } from 'zod'
const CreateUserEvent = BaseEvent.extend('CreateUserEvent', {
name: z.string(),
age: z.number(),
event_result_type: z.string(),
})
const UpdateUserEvent = BaseEvent.extend('UpdateUserEvent', {
id: z.string(),
age: z.number().nullable().optional(),
event_result_type: z.boolean(),
})
type UserRecord = { id: string; name: string; age: number }
const users = new Map<string, UserRecord>()
const onCreate = async (event: InstanceType<typeof CreateUserEvent>) => {
const user_id = `user-${event.age}`
users.set(user_id, { id: user_id, name: event.name, age: event.age })
return user_id
}
const onUpdate = async (event: InstanceType<typeof UpdateUserEvent>) => {
const existing = users.get(event.id)
if (!existing) return false
if (event.age !== undefined && event.age !== null) existing.age = event.age
users.set(event.id, existing)
return true
}
const bus = new EventBus('SDKBus')
bus.on(CreateUserEvent, onCreate)
bus.on(UpdateUserEvent, onUpdate)
const SDKClient = events_suck.wrap('SDKClient', {
create: CreateUserEvent,
update: UpdateUserEvent,
})
const client = new SDKClient(bus)
const user_id = await client.create({ name: 'bob', age: 45 }, { nickname: 'bobby' })
const updated = await client.update({ id: user_id ?? 'fallback-id', age: 46 }, { source: 'sync' })
// events_suck.wrap/make_events/make_handler are not implemented in Go yet.
// For typed event APIs, use OnTyped[...] and NewBaseEventWithResult[...].
// events_suck.wrap/make_events/make_handler are not implemented in abxbus-rust yet.
// Use typed events from the `event!` macro and the core EventBus APIs directly.
Related docs:
2) Pain: eventual consistency headaches
If your mental model is “I called something, I need a result now,” pure fire-and-forget event flows can feel stressful.
Two patterns reduce that stress:
- request/response on one bus with direct return values (
event_result() after now({first_result: true}) / wait({first_result: true}))
- immediate execution for nested calls inside handlers (RPC-style queue-jump)
These patterns feel function-like for in-process flows. If you later move a step across process/network boundaries (bridges), treat that edge as eventually consistent again.
Immediate execution docs: Immediate Execution (RPC-style)
Python
TypeScript
Go
Rust
class CheckoutEvent(BaseEvent[str]):
order_id: str
class ChargeCardEvent(BaseEvent[str]):
order_id: str
async def on_checkout(event: CheckoutEvent) -> str:
child = event.emit(ChargeCardEvent(order_id=event.order_id))
await child.now() # immediate path while inside handler
receipt_id = await child.event_result()
return receipt_id
async def on_charge(event: ChargeCardEvent) -> str:
return f'receipt-{event.order_id}'
const CheckoutEvent = BaseEvent.extend('CheckoutEvent', {
order_id: z.string(),
event_result_type: z.string(),
})
const ChargeCardEvent = BaseEvent.extend('ChargeCardEvent', {
order_id: z.string(),
event_result_type: z.string(),
})
bus.on(CheckoutEvent, async (event) => {
const child = event.emit(ChargeCardEvent({ order_id: event.order_id }))
await child.now() // immediate path while inside handler
return (await child.eventResult()) ?? 'missing-receipt'
})
bus.on(ChargeCardEvent, async (event) => `receipt-${event.order_id}`)
bus.On("CheckoutEvent", "on_checkout", func(event *abxbus.BaseEvent) (any, error) {
orderID, _ := event.Payload["order_id"].(string)
child := event.Emit(abxbus.NewBaseEvent("ChargeCardEvent", map[string]any{"order_id": orderID}))
if _, err := child.Now(); err != nil {
return nil, err
}
return child.EventResult()
}, nil)
bus.On("ChargeCardEvent", "on_charge", func(event *abxbus.BaseEvent) (any, error) {
orderID, _ := event.Payload["order_id"].(string)
return "receipt-" + orderID, nil
}, nil)
use abxbus_rust::{event, event_bus::EventBus};
event! {
struct CheckoutEvent {
order_id: String,
event_result_type: String,
}
}
event! {
struct ChargeCardEvent {
order_id: String,
event_result_type: String,
}
}
let bus = EventBus::new(Some("CheckoutBus".to_string()));
bus.on(CheckoutEvent, |event: CheckoutEvent| async move {
let child = event.emit(ChargeCardEvent {
order_id: event.order_id,
..Default::default()
});
let completed = child.now().await?;
Ok(completed.event_result().await?.unwrap_or_else(|| "missing-receipt".to_string()))
});
bus.on(ChargeCardEvent, |event: ChargeCardEvent| async move {
Ok(format!("receipt-{}", event.order_id))
});
Related docs:
3) Pain: defining signatures multiple times
You can keep one source of truth for payload shapes and reuse it in implementation code.
One source of truth examples
Use implementation function signatures as the source of truth, then generate event classes from them.from abxbus import EventBus, events_suck
from pydantic import validate_call
@validate_call
def create_user(id: str | None, name: str, age: int) -> str:
return f'{name}-{age}'
@validate_call
def update_user(id: str, age: int | None = None, **extra) -> bool:
return True
events = events_suck.make_events({
'UserCreateEvent': create_user,
'UserUpdateEvent': update_user,
})
bus = EventBus('LegacyBus')
bus.on(events.UserCreateEvent, events_suck.make_handler(create_user))
bus.on(events.UserUpdateEvent, events_suck.make_handler(update_user))
UserClient = events_suck.wrap('UserClient', {'create': events.UserCreateEvent, 'update': events.UserUpdateEvent})
client = UserClient(bus=bus)
Keep the schema as the source of truth, infer implementation input types from it, and reuse the same shape in BaseEvent.extend(...).import { BaseEvent, EventBus, events_suck } from 'abxbus'
import { z } from 'zod'
const CreateUserInputSchema = z.object({
id: z.string().nullable().optional(),
name: z.string(),
age: z.number(),
})
type CreateUserInput = z.infer<typeof CreateUserInputSchema>
const UserCreateEvent = BaseEvent.extend('UserCreateEvent', {
...CreateUserInputSchema.shape,
event_result_type: z.string(),
})
const bus = new EventBus('LegacyBus')
const create_user = async (input: CreateUserInput): Promise<string> => `${input.name}-${input.age}`
bus.on(UserCreateEvent, ({ id, name, age }) => create_user({ id, name, age }))
const UserClient = events_suck.wrap('UserClient', {
create: UserCreateEvent,
})
const client = new UserClient(bus)
const id = await client.create({ id: null, name: 'bob', age: 45 })
Keep event payload fields and handler result types in the same event definition, then let handler signatures use the generated Rust struct directly.use abxbus_rust::{event, event_bus::EventBus};
use futures::executor::block_on;
event! {
struct UserCreateEvent {
id: Option<String>,
name: String,
age: i64,
event_result_type: String,
}
}
let bus = EventBus::new(Some("LegacyBus".to_string()));
bus.on(UserCreateEvent, |event: UserCreateEvent| async move {
Ok(format!("{}-{}", event.name, event.age))
});
let event = bus.emit(UserCreateEvent {
id: None,
name: "bob".to_string(),
age: 45,
..Default::default()
});
let completed = block_on(event.now())?;
let id = block_on(completed.event_result())?;
Related docs:
Migration playbook
- Start with
wrap(...) to clean up call-site boilerplate first.
- Use immediate execution patterns where you need function-call-like request/response behavior.
- Consolidate types with
@validate_call (Python) or z.infer (TypeScript) to avoid signature drift.
- Add timeouts/retry policies where needed, instead of forcing eventual-consistency semantics everywhere.
You do not need to choose between clean DX and events. You can keep method-shaped APIs and adopt event internals incrementally.