Documentation Index
Fetch the complete documentation index at: https://abxbus.archivebox.io/llms.txt
Use this file to discover all available pages before exploring further.
BaseEvent is the typed payload + runtime state object that flows through the bus.
Use subclassing (Python) or BaseEvent.extend(...) (TypeScript) to define event payload fields.
Defining events
Python
TypeScript
Go
Rust
from abxbus import BaseEvent
class FooCreateEvent(BaseEvent[str]):
id: str | None = None
name: str
age: int
import { BaseEvent } from 'abxbus'
import { z } from 'zod'
const FooCreateEvent = BaseEvent.extend('FooCreateEvent', {
id: z.string().nullable().optional(),
name: z.string(),
age: z.number(),
event_result_type: z.string(),
})
type FooPayload struct {
ID *string `json:"id,omitempty"`
Name string `json:"name"`
Age int `json:"age"`
}
type FooResult string
event, err := abxbus.NewBaseEventWithResult[FooPayload, FooResult]("FooCreateEvent", FooPayload{
Name: "Ada",
Age: 37,
})
use abxbus_rust::event;
event! {
struct FooCreateEvent {
id: Option<String>,
name: String,
age: i64,
event_result_type: String,
}
}
let event = bus.emit(FooCreateEvent {
id: None,
name: "Ada".to_string(),
age: 37,
..Default::default()
});
Queue semantics are split in each runtime:
- Immediate path:
await event.now() (Python/TypeScript), event.Now() (Go), event.now().await (Rust)
- Queue-order path:
await event.wait() (Python/TypeScript), event.Wait() (Go), event.wait().await (Rust)
now() and wait() accept the wait-shaping options (first_result and timeout). Result filtering and error policy live on event_result() / event_results_list().
Python also supports await event as a Python-only shortcut for await event.now(). Other runtimes require the explicit now() / wait() methods.
In Go the same split is exposed as event.Now() for immediate queue-jump execution and event.Wait() for normal queue-order waiting.
In Rust the same split is exposed as event.now().await for immediate queue-jump execution and event.wait().await for normal queue-order waiting.
Unset event option fields are not hydrated onto the event at emit time; the bus that actually processes the event resolves its defaults at execution time.
Common event metadata fields available in all runtimes:
event_id: unique UUIDv7
event_type: event name/type key
event_version: payload version marker
event_result_type: expected handler return schema/type
event_timeout: per-event timeout override (None/null means use the current processing bus default at execution time; 0 disables)
event_handler_timeout: per-handler timeout cap override
event_handler_slow_timeout: per-handler slow warning threshold (None/null means use the current processing bus default; 0 disables)
event_slow_timeout: per-event slow warning threshold (None/null means use the current processing bus default; 0 disables)
event_concurrency: event scheduling mode override (None/null/zero-value mode means use the current processing bus default)
event_handler_concurrency: handler scheduling mode override (None/null/zero-value mode means use the current processing bus default)
event_handler_completion: handler completion strategy override (None/null/zero-value mode means use the current processing bus default)
Runtime fields
event_status: pending/started/completed
event_created_at, event_started_at, event_completed_at
event_started_at / event_completed_at are None (Python) / null (TypeScript) until set
event_parent_id and event_emitted_by_handler_id are None / null when unset
event_blocks_parent_completion: True / true only for linked children awaited from the handler that emitted them
event_path: buses traversed
event_results: per-handler result entries
- Child-event tracking (
event_children/descendants)
Completion model
Events are returned in pending state from emit(), then complete asynchronously.
Python
TypeScript
Go
Rust
pending = bus.emit(MyEvent())
completed = await pending.now()
completed_without_raise = await pending.now()
completed_in_queue_order = await pending.wait()
value = await completed.event_result()
const pending = bus.emit(MyEvent({}))
const completed = await pending.now()
const completedWithoutRaise = await pending.now()
const completed_in_queue_order = await pending.wait()
const value = await completed.eventResult()
type MyEvent struct{}
pending := bus.Emit(abxbus.MustNewTypedEvent[MyEvent]("MyEvent", MyEvent{}))
completed, err := pending.Now()
if err != nil {
return err
}
completedInQueueOrder, err := pending.Wait()
if err != nil {
return err
}
value, err := completed.EventResult()
_ = completedInQueueOrder
use futures::executor::block_on;
let pending = bus.emit(MyEvent {
..Default::default()
});
block_on(pending.now())?;
block_on(pending.wait())?;
let value = block_on(pending.event_result())?;
Result access helpers
Result helpers share the same defaults in all runtimes:
raise_if_any=true: raise when any handler result is an error.
raise_if_none=false: return None / undefined / nil / [] when no result matches the filter.
- If every handler errors, only
raise_if_any=false plus raise_if_none=false suppresses the error. Any other combination raises.
- A single handler error is raised as that error; multiple handler errors are raised as an aggregate/exception-group shape.
First Result
Python
TypeScript
Go
Rust
completed = await event.now(first_result=True)
value = await completed.event_result()
const value = await event.now({ first_result: true }).eventResult()
completed, err := event.Now(&abxbus.EventWaitOptions{FirstResult: true})
if err != nil {
return err
}
value, err := completed.EventResult()
let completed = block_on(event.now_with_options(EventWaitOptions {
first_result: true,
..Default::default()
}))?;
let value = block_on(completed.event_result())?;
All results
Python
TypeScript
Go
Rust
items = await event.event_results_list()
by_handler = {handler_id: result.result for handler_id, result in event.event_results.items()}
const items = await event.eventResultsList()
const filtered = await event.eventResultsList({
include: (result) => typeof result === 'string',
raise_if_any: false,
raise_if_none: true,
})
const first = await event.eventResult()
const errors = event.event_errors
items, err := event.EventResultsList()
filtered, err := event.EventResultsList(
&abxbus.EventResultOptions{
Include: func(result any, eventResult *abxbus.EventResult) bool {
_, ok := result.(string)
return ok
},
RaiseIfAny: false,
RaiseIfNone: true,
},
)
let items = block_on(event.event_results_list_with_options(EventResultOptions {
raise_if_any: false,
raise_if_none: false,
..EventResultOptions::default()
}))?;
let by_handler = event.event_results.read();
Per-handler result entries
You can create/update a specific EventResult entry for a handler (useful for controlled seeding/rehydration flows).
Python
TypeScript
Go
Rust
pending = event.event_result_update(handler=handler_entry, eventbus=bus, status='pending')
pending.update(status='completed', result='seeded')
const pending = event.eventResultUpdate(handler_entry, { eventbus: bus, status: 'pending' })
pending.update({ status: 'completed', result: 'seeded' })
pending := event.EventResultUpdate(handlerEntry, &abxbus.BaseEventResultUpdateOptions{
EventBus: bus,
EventResultUpdateOptions: abxbus.EventResultUpdateOptions{
Status: abxbus.EventResultPending,
},
})
pending.Update(&abxbus.EventResultUpdateOptions{
Status: abxbus.EventResultCompleted,
Result: "seeded",
})
use abxbus_rust::event_result::EventResultStatus;
use serde_json::json;
let pending = event.event_result_update(
&handler_entry,
Some(EventResultStatus::Pending),
None,
None,
None,
);
event.event_result_update(
&handler_entry,
Some(EventResultStatus::Completed),
Some(Some(json!("seeded"))),
None,
None,
);
Resetting an event
You can create a fresh pending copy for re-emit.
Python
TypeScript
Go
Rust
fresh = event.event_reset()
const fresh = event.eventReset()
fresh, err := event.EventReset()
let fresh = event.event_reset();
Serialization
Events are JSON-serializable in all runtimes for bridge and cross-runtime workflows.
Python
TypeScript
Go
Rust
payload = event.model_dump(mode='json')
print(payload)
# {
# "event_id": "0190...",
# "event_type": "CreateUserEvent",
# "event_status": "pending",
# "event_result_type": {"type": "object", "...": "..."},
# "email": "[email protected]",
# "...": "..."
# }
restored = type(event).model_validate(payload)
const payload = event.toJSON()
console.log(payload)
// {
// event_id: '0190...',
// event_type: 'CreateUserEvent',
// event_status: 'pending',
// event_result_type: { type: 'object', ... },
// email: '[email protected]',
// ...
// }
const restored = BaseEvent.fromJSON(payload)
payload, err := event.ToJSON()
fmt.Println(string(payload))
// {
// "event_id": "0190...",
// "event_type": "CreateUserEvent",
// "event_status": "pending",
// "event_result_type": {"type": "object", "...": "..."},
// "email": "[email protected]",
// "...": "..."
// }
restored, err := abxbus.BaseEventFromJSON(payload)
let payload = event.to_json_value();
println!("{}", serde_json::to_string_pretty(&payload)?);
// {
// "event_id": "0190...",
// "event_type": "CreateUserEvent",
// "event_status": "pending",
// "event_result_type": {"type": "object", "...": "..."},
// "email": "[email protected]",
// "...": "..."
// }
let restored: CreateUserEvent = serde_json::from_value(payload)?;
Notes
- Reserved names are validated in all runtimes:
- runtime API names such as
bus, emit, now, and wait cannot be provided as payload fields where they would collide with event methods/properties.
- Unknown
event_* fields are rejected.
- Known built-in
event_* fields (for example event_timeout) can still be intentionally overridden in event definitions.
model_* is also reserved:
- Python: unknown
model_* fields are rejected, but valid Pydantic namespace overrides (for example model_config) are allowed.
- TypeScript: any
model_* field is rejected.
event_result_type drives handler return validation in all runtimes.
- Parent-child tracking is automatic for
event.emit(...) inside handlers. bus.emit(...) creates a top-level event with no parent link.
Reserved Fields
Python
TypeScript
Go
Rust
from pydantic import ConfigDict
from abxbus import BaseEvent
class AllowedEvent(BaseEvent[None]):
event_timeout: float | None = 30 # allowed built-in event_* override
model_config = ConfigDict(extra='allow') # allowed Pydantic model_* override
# rejected: unknown reserved prefixes
class InvalidEvent(BaseEvent[None]):
event_some_field_we_dont_recognize: int = 1 # raises
model_something_random: int = 2 # raises
import { BaseEvent } from 'abxbus'
import { z } from 'zod'
const AllowedEvent = BaseEvent.extend('AllowedEvent', {
event_timeout: 30, // allowed built-in event_* override
payload: z.string(),
})
// rejected: unknown event_* and all model_*
BaseEvent.extend('InvalidEvent', {
event_some_field_we_dont_recognize: 1, // throws
model_something_random: 2, // throws
})
timeout := 30.0
type AllowedEvent struct {
Payload string `json:"payload"`
EventTimeout *float64 `json:"event_timeout,omitempty"`
}
event, err := abxbus.Event(AllowedEvent{
Payload: "ok",
EventTimeout: &timeout, // allowed built-in event_* override
})
if err != nil {
return err
}
// Go payloads are passed as maps or typed structs, so unknown event_* / model_* keys
// should not be placed in the flattened payload. Use explicit BaseEvent fields instead.
use abxbus_rust::event;
event! {
struct AllowedEvent {
payload: String,
event_result_type: (),
}
}
let event = bus.emit(AllowedEvent {
payload: "ok".to_string(),
event_timeout: Some(30.0),
..Default::default()
});
// Rust validates unknown event_* / model_* keys during typed event construction
// and uses explicit BaseEventData fields for built-in runtime overrides.