End-to-end: saga → outbox → inbox
The individual pieces — domain events, the outbox, sagas, the inbox — each solve one problem. This recipe puts them together in one runnable program: a checkout that reserves inventory, confirms an order, and ships it — reliably, across aggregates, with compensation if a step fails.
The full program lives at examples/recipes/order_fulfillment/ and runs in-process
on the mock — no Docker. The Events & sagas chapter
explains why each piece works; this is the assembled flow.
The aggregate announces a change¶
Order extends AggregateRoot, so it can record a domain event the moment its
state transitions — declared next to the data it watches:
class Order(Document, AggregateRoot):
status: str = "pending"
@event_emitter(fields={"status"})
def _on_confirm(before, after: Self, diff: JsonDict) -> DomainEvent | None: # type: ignore[no-untyped-def]
if after.status == "confirmed" and before.status != "confirmed":
return OrderConfirmed(aggregate_id=after.id)
return None
A saga orchestrates the two aggregates¶
Checkout spans Inventory and Order. A saga runs the steps with a pivot:
everything before the pivot is compensatable, the pivot commits the outcome.
Reserve inventory first; confirming the order is the pivot.
def build_checkout_saga() -> SagaDefinition[CheckoutCtx]:
return SagaDefinition(
name="checkout",
steps=(
SagaStep(
name="reserve",
action=_reserve,
compensation=_release,
tx_route="mock",
),
SagaStep(
name="confirm",
action=_confirm,
kind=SagaStepKind.PIVOT,
tx_route="mock",
),
),
)
The pivot step: change state and stage the event together¶
Confirming the order trips its event emitter; the application layer dispatches that event inside the step's transaction, where the outbox bridge stages an integration event. State change and event are one atomic write:
async def _confirm(ctx: ExecutionContext, s: CheckoutCtx) -> CheckoutCtx:
# Pivot. A pre-commit failure (e.g. payment declined) compensates `reserve`.
if s.simulate_failure:
log.error("payment declined — failing the pivot step")
raise exc.domain("payment declined")
order = await ctx.document.query(ORDER_SPEC).get(s.order_id)
# The status transition fires @event_emitter -> OrderConfirmed, dispatched in THIS
# step's transaction (the command flow), which the outbox bridge stages.
await ctx.document.command(ORDER_SPEC).update(
s.order_id, order.rev, OrderUpdate(status="confirmed")
)
# Transactional outbox: flush the staged event within the same transaction.
await ctx.outbox.command(OUTBOX_SPEC).flush()
log.info(
"order confirmed — OrderConfirmed staged to outbox", order_id=str(s.order_id)
)
return s
The bridge that turns the domain event into a staged outbox row is wired once:
def build_context() -> ExecutionContext:
registry = DomainEventRegistry()
registry.register(
OrderConfirmed,
outbox_event_handler(
OUTBOX_SPEC,
"order.confirmed",
lambda e: OrderConfirmedPayload(order_id=str(e.aggregate_id)),
),
)
module = MockDepsModule(domain_events=registry)
return ExecutionContext(deps=DepsRegistry.from_modules(module).freeze().resolve())
Relay carries it to the consumer¶
A relay claims staged rows and delivers them downstream — standing in for a broker plus the outbox relay worker:
async def relay_once(ctx: ExecutionContext) -> list[RelayMessage]:
query = ctx.outbox.query(OUTBOX_SPEC)
claims = await query.claim_pending()
messages = [
RelayMessage(key=str(c.event_id), order_id=UUID(c.payload["order_id"]))
for c in claims
]
if claims:
await query.mark_published([c.id for c in claims])
log.info("relayed events from outbox", count=len(messages))
return messages
The inbox makes delivery exactly-once¶
Relays are at-least-once: the same event can arrive twice. The consumer records
each message id in the inbox and skips duplicates, so the Shipment is created
once no matter how many times the event is delivered:
async def deliver(ctx: ExecutionContext, message: RelayMessage) -> bool:
"""Process one relayed message exactly-once. Returns False if it was a duplicate."""
processed = await process_with_inbox(
ctx,
message,
inbox_spec=INBOX_SPEC,
handler=lambda m: _fulfill(ctx, m),
tx_route="mock",
)
if not processed:
log.debug("duplicate message skipped by inbox", key=message.key)
return processed
Notes¶
- Compensation is automatic. If the pivot fails, the saga runs the compensations for completed steps (here: release the reserved inventory) and stages nothing downstream.
- The event commits with the write. Because the dispatch happens in the step's
transaction, a published
order.confirmedalways corresponds to a committed order — never a phantom event, never a silent loss. - Swap the mock for real backends without touching this logic: a Postgres
store + outbox, a real broker via
relay_outbox_to_queue, and the same inbox dedup. The orchestration is backend-agnostic.