Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
If you like events in theory but hate the day-to-day developer experience, this page is for you.
Common pain points:
- calling boilerplate (
emit + await completion + unwrap result) for every request
- eventual consistency anxiety (“will my response event arrive?”)
- duplicating signatures across schemas, handlers, and implementation functions
The goal here is to keep event architecture benefits without forcing painful calling patterns.
1) Pain: painful calling interface boilerplate
You usually end up writing verbose call sites repeatedly.
events_suck.wrap(...) gives you a method-shaped client API (client.create(...)) while still routing through events.
# Without wrap: valid, but noisy at every call site
event = bus.emit(CreateUserEvent(name='bob', age=45))
user_id = await event.event_result()
all_values = await event.event_results_list(raise_if_any=False, raise_if_none=False)
# With wrap: looks like normal async function calls
SDKClient = events_suck.wrap('SDKClient', {'create': CreateUserEvent, 'update': UpdateUserEvent})
client = SDKClient(bus=bus)
user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')
// Without wrap: valid, but noisy at every call site
const event = bus.emit(CreateUserEvent({ name: 'bob', age: 45 }))
await event.done()
const user_id = event.event_result
const all_values = await event.eventResultsList({ raise_if_any: false, raise_if_none: false })
// With wrap: looks like normal async function calls
const SDKClient = events_suck.wrap('SDKClient', { create: CreateUserEvent, update: UpdateUserEvent })
const client = new SDKClient(bus)
const id = await client.create({ name: 'bob', age: 45 }, { nickname: 'bobby' })
const updated = await client.update({ id: id ?? 'fallback-id', age: 46 }, { source: 'sync' })
Minimal end-to-end wrap(...) wiring
from abxbus import BaseEvent, EventBus, events_suck
class CreateUserEvent(BaseEvent[str]):
name: str
age: int
class UpdateUserEvent(BaseEvent[bool]):
id: str
age: int | None = None
class UserService:
def __init__(self) -> None:
self.users: dict[str, dict[str, int | str]] = {}
async def on_create(self, event: CreateUserEvent) -> str:
user_id = f'user-{event.age}'
self.users[user_id] = {'id': user_id, 'name': event.name, 'age': event.age}
return user_id
async def on_update(self, event: UpdateUserEvent) -> bool:
if event.id not in self.users:
return False
if event.age is not None:
self.users[event.id]['age'] = event.age
return True
bus = EventBus('SDKBus')
service = UserService()
bus.on(CreateUserEvent, service.on_create)
bus.on(UpdateUserEvent, service.on_update)
SDKClient = events_suck.wrap('SDKClient', {
'create': CreateUserEvent,
'update': UpdateUserEvent,
})
client = SDKClient(bus=bus)
user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')
import { BaseEvent, EventBus, events_suck } from 'abxbus'
import { z } from 'zod'
const CreateUserEvent = BaseEvent.extend('CreateUserEvent', {
name: z.string(),
age: z.number(),
event_result_type: z.string(),
})
const UpdateUserEvent = BaseEvent.extend('UpdateUserEvent', {
id: z.string(),
age: z.number().nullable().optional(),
event_result_type: z.boolean(),
})
type UserRecord = { id: string; name: string; age: number }
const users = new Map<string, UserRecord>()
const onCreate = async (event: InstanceType<typeof CreateUserEvent>) => {
const user_id = `user-${event.age}`
users.set(user_id, { id: user_id, name: event.name, age: event.age })
return user_id
}
const onUpdate = async (event: InstanceType<typeof UpdateUserEvent>) => {
const existing = users.get(event.id)
if (!existing) return false
if (event.age !== undefined && event.age !== null) existing.age = event.age
users.set(event.id, existing)
return true
}
const bus = new EventBus('SDKBus')
bus.on(CreateUserEvent, onCreate)
bus.on(UpdateUserEvent, onUpdate)
const SDKClient = events_suck.wrap('SDKClient', {
create: CreateUserEvent,
update: UpdateUserEvent,
})
const client = new SDKClient(bus)
const user_id = await client.create({ name: 'bob', age: 45 }, { nickname: 'bobby' })
const updated = await client.update({ id: user_id ?? 'fallback-id', age: 46 }, { source: 'sync' })
Related docs:
2) Pain: eventual consistency headaches
If your mental model is “I called something, I need a result now,” pure fire-and-forget event flows can feel stressful.
Two patterns reduce that stress:
- request/response on one bus with direct return values (
event_result / first())
- immediate execution for nested calls inside handlers (RPC-style queue-jump)
These patterns feel function-like for in-process flows. If you later move a step across process/network boundaries (bridges), treat that edge as eventually consistent again.
Immediate execution docs: Immediate Execution (RPC-style)
class CheckoutEvent(BaseEvent[str]):
order_id: str
class ChargeCardEvent(BaseEvent[str]):
order_id: str
async def on_checkout(event: CheckoutEvent) -> str:
child = event.emit(ChargeCardEvent(order_id=event.order_id))
await child # immediate path while inside handler
receipt_id = await child.event_result()
return receipt_id
async def on_charge(event: ChargeCardEvent) -> str:
return f'receipt-{event.order_id}'
const CheckoutEvent = BaseEvent.extend('CheckoutEvent', {
order_id: z.string(),
event_result_type: z.string(),
})
const ChargeCardEvent = BaseEvent.extend('ChargeCardEvent', {
order_id: z.string(),
event_result_type: z.string(),
})
bus.on(CheckoutEvent, async (event) => {
const child = event.emit(ChargeCardEvent({ order_id: event.order_id }))
await child.done() // immediate path while inside handler
return child.event_result ?? 'missing-receipt'
})
bus.on(ChargeCardEvent, async (event) => `receipt-${event.order_id}`)
Related docs:
3) Pain: defining signatures multiple times
You can keep one source of truth for payload shapes and reuse it in implementation code.
Python: @validate_call + make_events(...) + make_handler(...)
Use implementation function signatures as the source of truth, then generate event classes from them.
from abxbus import EventBus, events_suck
from pydantic import validate_call
@validate_call
def create_user(id: str | None, name: str, age: int) -> str:
return f'{name}-{age}'
@validate_call
def update_user(id: str, age: int | None = None, **extra) -> bool:
return True
events = events_suck.make_events({
'UserCreateEvent': create_user,
'UserUpdateEvent': update_user,
})
bus = EventBus('LegacyBus')
bus.on(events.UserCreateEvent, events_suck.make_handler(create_user))
bus.on(events.UserUpdateEvent, events_suck.make_handler(update_user))
UserClient = events_suck.wrap('UserClient', {'create': events.UserCreateEvent, 'update': events.UserUpdateEvent})
client = UserClient(bus=bus)
TypeScript: zod schema + z.infer shared with implementation
Keep the schema as the source of truth, infer implementation input types from it, and reuse the same shape in BaseEvent.extend(...).
import { BaseEvent, EventBus, events_suck } from 'abxbus'
import { z } from 'zod'
const CreateUserInputSchema = z.object({
id: z.string().nullable().optional(),
name: z.string(),
age: z.number(),
})
type CreateUserInput = z.infer<typeof CreateUserInputSchema>
const UserCreateEvent = BaseEvent.extend('UserCreateEvent', {
...CreateUserInputSchema.shape,
event_result_type: z.string(),
})
const bus = new EventBus('LegacyBus')
const create_user = async (input: CreateUserInput): Promise<string> => `${input.name}-${input.age}`
bus.on(UserCreateEvent, ({ id, name, age }) => create_user({ id, name, age }))
const UserClient = events_suck.wrap('UserClient', {
create: UserCreateEvent,
})
const client = new UserClient(bus)
const id = await client.create({ id: null, name: 'bob', age: 45 })
Related docs:
Migration playbook
- Start with
wrap(...) to clean up call-site boilerplate first.
- Use immediate execution patterns where you need function-call-like request/response behavior.
- Consolidate types with
@validate_call (Python) or z.infer (TypeScript) to avoid signature drift.
- Add timeouts/retry policies where needed, instead of forcing eventual-consistency semantics everywhere.
You do not need to choose between clean DX and events. You can keep method-shaped APIs and adopt event internals incrementally.