Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
Install abxbus, define one typed event, register a handler, and emit the event.
Repository example files:
Install
First event
import asyncio
from abxbus import BaseEvent, EventBus
class CreateUserEvent(BaseEvent[dict]):
email: str
async def on_create_user(event: CreateUserEvent) -> dict:
user = await your_create_user_logic(event.email)
return {'user_id': user['id']}
async def main() -> None:
bus = EventBus('MyAuthEventBus')
bus.on(CreateUserEvent, on_create_user)
result = await bus.emit(CreateUserEvent(email='[email protected]')).event_result()
print(result)
# {'user_id': 'some-user-uuid'}
asyncio.run(main())
import { BaseEvent, EventBus } from 'abxbus'
import { z } from 'zod'
const CreateUserEvent = BaseEvent.extend('CreateUserEvent', {
email: z.string(),
event_result_type: z.object({ user_id: z.string() }),
})
const bus = new EventBus('MyAuthEventBus')
bus.on(CreateUserEvent, async (event) => {
const user = await yourCreateUserLogic(event.email)
return { user_id: user.id }
})
const event = bus.emit(CreateUserEvent({ email: '[email protected]' }))
await event.done()
console.log(event.event_result) // { user_id: 'some-user-uuid' }
Next steps