Events & sagas
A confirmed order should reliably create a shipment — even though the order and the shipment live in different systems, even if the message is delivered twice, and even if a step fails partway. Forze builds that reliability from three pieces, all riding the after-commit deferral from Transactions: domain events, the transactional outbox, and sagas.
We'll trace one worked example throughout — the end-to-end recipe, a complete program you can run.
A state change worth announcing¶
An aggregate that extends AggregateRoot can emit a DomainEvent when its state
transitions. The Order records OrderConfirmed the moment its status becomes
confirmed — declared right next to the data it watches:
class Order(Document, AggregateRoot):
status: str = "pending"
@event_emitter(fields={"status"})
def _on_confirm(before, after: Self, diff: JsonDict) -> DomainEvent | None: # type: ignore[no-untyped-def]
if after.status == "confirmed" and before.status != "confirmed":
return OrderConfirmed(aggregate_id=after.id)
return None
The event is recorded on the instance during update(); the application layer
drains and dispatches it when the operation persists. The aggregate doesn't know
what happens to the event next — that's wired separately.
From event to outbox¶
A domain event is only reliable if it can't be lost when the transaction commits, nor sent when it rolls back. The bridge is a handler that turns the event into a staged outbox row:
def build_context() -> ExecutionContext:
registry = DomainEventRegistry()
registry.register(
OrderConfirmed,
outbox_event_handler(
OUTBOX_SPEC,
"order.confirmed",
lambda e: OrderConfirmedPayload(order_id=str(e.aggregate_id)),
),
)
module = MockDepsModule(domain_events=registry)
return ExecutionContext(deps=DepsRegistry.from_modules(module).freeze().resolve())
outbox_event_handler maps OrderConfirmed onto an order.confirmed integration
event with a typed payload. From now on, whenever an OrderConfirmed is
dispatched, it's staged to the outbox.
The transactional outbox¶
The point of the outbox is that the event row and the state change commit together. The confirm step updates the order and flushes the staged event inside the same transaction:
async def _confirm(ctx: ExecutionContext, s: CheckoutCtx) -> CheckoutCtx:
# Pivot. A pre-commit failure (e.g. payment declined) compensates `reserve`.
if s.simulate_failure:
log.error("payment declined — failing the pivot step")
raise exc.domain("payment declined")
order = await ctx.document.query(ORDER_SPEC).get(s.order_id)
# The status transition fires @event_emitter -> OrderConfirmed, dispatched in THIS
# step's transaction (the command flow), which the outbox bridge stages.
await ctx.document.command(ORDER_SPEC).update(
s.order_id, order.rev, OrderUpdate(status="confirmed")
)
# Transactional outbox: flush the staged event within the same transaction.
await ctx.outbox.command(OUTBOX_SPEC).flush()
log.info(
"order confirmed — OrderConfirmed staged to outbox", order_id=str(s.order_id)
)
return s
Because both land in one transaction, you never get a confirmed order without its event, or an event for an order that rolled back. That's the dual-write problem — solved not with a distributed transaction, but with a local one plus a relay.
A relay then moves staged rows onward. In production that's a worker handing off to a broker; here it claims the pending rows and passes them along in-process:
async def relay_once(ctx: ExecutionContext) -> list[RelayMessage]:
query = ctx.outbox.query(OUTBOX_SPEC)
claims = await query.claim_pending()
messages = [
RelayMessage(key=str(c.event_id), order_id=UUID(c.payload["order_id"]))
for c in claims
]
if claims:
await query.mark_published([c.id for c in claims])
log.info("relayed events from outbox", count=len(messages))
return messages
Exactly-once on the way in: the inbox¶
Brokers redeliver. The consumer dedupes by event id through the inbox, so handling the same message twice creates one shipment, not two:
async def deliver(ctx: ExecutionContext, message: RelayMessage) -> bool:
"""Process one relayed message exactly-once. Returns False if it was a duplicate."""
processed = await process_with_inbox(
ctx,
message,
inbox_spec=INBOX_SPEC,
handler=lambda m: _fulfill(ctx, m),
tx_route="mock",
)
if not processed:
log.debug("duplicate message skipped by inbox", key=message.key)
return processed
process_with_inbox runs the handler only if that event id hasn't been seen
before; a redelivery returns False and does nothing. Combined with the outbox,
that's exactly-once effect delivery across a boundary that only offers
at-least-once.
Sagas: many steps, one outcome¶
The confirm step above doesn't run alone — it's the final step of a checkout
saga. Reserving inventory and confirming the order can't share a single
database transaction; they're distinct steps that may each fail. A saga
sequences them with compensation: every step carries an undo, and a failure
rolls the completed steps back in reverse.
def build_checkout_saga() -> SagaDefinition[CheckoutCtx]:
return SagaDefinition(
name="checkout",
steps=(
SagaStep(
name="reserve",
action=_reserve,
compensation=_release,
tx_route="mock",
),
SagaStep(
name="confirm",
action=_confirm,
kind=SagaStepKind.PIVOT,
tx_route="mock",
),
),
)
reserveis compensatable — if a later step fails,_releaseruns to undo the reservation.confirmis the pivot (SagaStepKind.PIVOT) — the point of no return. A failure before it compensates everything prior; once it commits, the saga is durable.
Three behaviours¶
The same flow handles the cases that make event-driven systems hard:
| Scenario | Outcome |
|---|---|
| Happy path | Order confirmed, event relayed, exactly one shipment |
| Redelivery | Duplicate message skipped by the inbox — still one shipment |
| Pivot fails (payment declined) | Inventory released, order stays pending, nothing staged, relayed, or shipped |