Skip to content

Events & sagas

A confirmed order should reliably create a shipment — even though the order and the shipment live in different systems, even if the message is delivered twice, and even if a step fails partway. Forze builds that reliability from three pieces, all riding the after-commit deferral from Transactions: domain events, the transactional outbox, and sagas.

We'll trace one worked example throughout — the end-to-end recipe, a complete program you can run.

A state change and its event commit in one transaction, then relay and inbox carry it exactly-once to a shipment A state change and its event commit in one transaction, then relay and inbox carry it exactly-once to a shipment

A state change worth announcing

An aggregate that extends AggregateRoot can emit a DomainEvent when its state transitions. The Order records OrderConfirmed the moment its status becomes confirmed — declared right next to the data it watches:

class Order(Document, AggregateRoot):
    status: str = "pending"

    @event_emitter(fields={"status"})
    def _on_confirm(before, after: Self, diff: JsonDict) -> DomainEvent | None:  # type: ignore[no-untyped-def]
        if after.status == "confirmed" and before.status != "confirmed":
            return OrderConfirmed(aggregate_id=after.id)

        return None

The event is recorded on the instance during update(); the application layer drains and dispatches it when the operation persists. The aggregate doesn't know what happens to the event next — that's wired separately.

From event to outbox

A domain event is only reliable if it can't be lost when the transaction commits, nor sent when it rolls back. The bridge is a handler that turns the event into a staged outbox row:

def build_context() -> ExecutionContext:
    registry = DomainEventRegistry()
    registry.register(
        OrderConfirmed,
        outbox_event_handler(
            OUTBOX_SPEC,
            "order.confirmed",
            lambda e: OrderConfirmedPayload(order_id=str(e.aggregate_id)),
        ),
    )
    module = MockDepsModule(domain_events=registry)
    return ExecutionContext(deps=DepsRegistry.from_modules(module).freeze().resolve())

outbox_event_handler maps OrderConfirmed onto an order.confirmed integration event with a typed payload. From now on, whenever an OrderConfirmed is dispatched, it's staged to the outbox.

The transactional outbox

The point of the outbox is that the event row and the state change commit together. The confirm step updates the order and flushes the staged event inside the same transaction:

async def _confirm(ctx: ExecutionContext, s: CheckoutCtx) -> CheckoutCtx:
    # Pivot. A pre-commit failure (e.g. payment declined) compensates `reserve`.
    if s.simulate_failure:
        log.error("payment declined — failing the pivot step")
        raise exc.domain("payment declined")

    order = await ctx.document.query(ORDER_SPEC).get(s.order_id)
    # The status transition fires @event_emitter -> OrderConfirmed, dispatched in THIS
    # step's transaction (the command flow), which the outbox bridge stages.
    await ctx.document.command(ORDER_SPEC).update(
        s.order_id, order.rev, OrderUpdate(status="confirmed")
    )
    # Transactional outbox: flush the staged event within the same transaction.
    await ctx.outbox.command(OUTBOX_SPEC).flush()

    log.info(
        "order confirmed — OrderConfirmed staged to outbox", order_id=str(s.order_id)
    )
    return s

Because both land in one transaction, you never get a confirmed order without its event, or an event for an order that rolled back. That's the dual-write problem — solved not with a distributed transaction, but with a local one plus a relay.

A relay then moves staged rows onward. In production that's a worker handing off to a broker; here it claims the pending rows and passes them along in-process:

async def relay_once(ctx: ExecutionContext) -> list[RelayMessage]:
    query = ctx.outbox.query(OUTBOX_SPEC)
    claims = await query.claim_pending()

    messages = [
        RelayMessage(key=str(c.event_id), order_id=UUID(c.payload["order_id"]))
        for c in claims
    ]

    if claims:
        await query.mark_published([c.id for c in claims])

    log.info("relayed events from outbox", count=len(messages))
    return messages

Exactly-once on the way in: the inbox

Brokers redeliver. The consumer dedupes by event id through the inbox, so handling the same message twice creates one shipment, not two:

async def deliver(ctx: ExecutionContext, message: RelayMessage) -> bool:
    """Process one relayed message exactly-once. Returns False if it was a duplicate."""

    processed = await process_with_inbox(
        ctx,
        message,
        inbox_spec=INBOX_SPEC,
        handler=lambda m: _fulfill(ctx, m),
        tx_route="mock",
    )

    if not processed:
        log.debug("duplicate message skipped by inbox", key=message.key)

    return processed

process_with_inbox runs the handler only if that event id hasn't been seen before; a redelivery returns False and does nothing. Combined with the outbox, that's exactly-once effect delivery across a boundary that only offers at-least-once.

Sagas: many steps, one outcome

The confirm step above doesn't run alone — it's the final step of a checkout saga. Reserving inventory and confirming the order can't share a single database transaction; they're distinct steps that may each fail. A saga sequences them with compensation: every step carries an undo, and a failure rolls the completed steps back in reverse.

def build_checkout_saga() -> SagaDefinition[CheckoutCtx]:
    return SagaDefinition(
        name="checkout",
        steps=(
            SagaStep(
                name="reserve",
                action=_reserve,
                compensation=_release,
                tx_route="mock",
            ),
            SagaStep(
                name="confirm",
                action=_confirm,
                kind=SagaStepKind.PIVOT,
                tx_route="mock",
            ),
        ),
    )
  • reserve is compensatable — if a later step fails, _release runs to undo the reservation.
  • confirm is the pivot (SagaStepKind.PIVOT) — the point of no return. A failure before it compensates everything prior; once it commits, the saga is durable.

Three behaviours

The same flow handles the cases that make event-driven systems hard:

Scenario Outcome
Happy path Order confirmed, event relayed, exactly one shipment
Redelivery Duplicate message skipped by the inbox — still one shipment
Pivot fails (payment declined) Inventory released, order stays pending, nothing staged, relayed, or shipped