Transactional outbox
You can't atomically write to your database and publish to a broker — a crash between the two loses or duplicates the event. The outbox makes it one write: stage the event in the same transaction as the business change, then a relay moves staged rows to the broker afterwards. The concept is in Events & sagas; this is the wiring.
The runnable version lives at examples/recipes/outbox/ and runs on the
in-memory mock — no broker needed.
The event and its destination¶
An OutboxSpec carries the payload codec and names the queue it relays to — the
destination route must equal the QueueSpec.name:
class OrderPlaced(BaseModel):
order_id: str
# The outbox spec names its destination queue; its route must equal the queue's name.
ORDER_EVENTS = OutboxSpec(
name="order-events",
codec=PydanticModelCodec(OrderPlaced),
destination=OutboxDestination.queue(route="orders", channel="orders"),
)
ORDERS_QUEUE = QueueSpec(name="orders", codec=PydanticModelCodec(OrderPlaced))
Stage it with the write¶
Inside the business transaction, stage the event and flush — it commits (or rolls back) together with the write, so a published event always corresponds to a committed change:
async def place_order(ctx: ExecutionContext, order_id: str) -> None:
# Your business write goes here, in a transaction. Stage the integration
# event in the same unit of work, then flush — it commits with the write.
outbox = ctx.outbox.command(ORDER_EVENTS)
await outbox.stage("order.placed", OrderPlaced(order_id=order_id), event_id=uuid4())
await outbox.flush()
In a real handler you'd attach outbox_flush_tx_on_success_factory to the
operation so the flush fires automatically on transaction success, rather than
calling flush() by hand.
Relay to the broker¶
A relay claims staged rows and publishes them to the queue, returning what it did:
async def relay(ctx: ExecutionContext) -> int:
# In production this runs in the background (outbox_relay_background_lifecycle_step);
# here we drive one pass. It claims staged rows and publishes them to the queue.
result = await OutboxRelay(outbox_spec=ORDER_EVENTS).to_queue(ctx, ORDERS_QUEUE)
return result.published
In production the relay runs continuously as a lifecycle step:
from datetime import timedelta
from forze.application.execution import LifecyclePlan
from forze_kits.integrations.outbox import outbox_relay_background_lifecycle_step
lifecycle = LifecyclePlan.from_steps(
outbox_relay_background_lifecycle_step(
outbox_spec=ORDER_EVENTS,
queue_spec=ORDERS_QUEUE, # required for the queue transport
interval=timedelta(seconds=5),
),
)
Consuming on the other side¶
run_consumer is the consumer-side counterpart — it replaces the hand-rolled
consume → dedupe → ack/nack loop with the decisions already made correctly.
Per message it: parks handler-poison (opt-in max_deliveries), runs the
handler exactly-once through the inbox
(process_with_inbox, same dedup transaction, correlation rebound from the
envelope headers), acks both fresh and duplicate deliveries — a
redelivered already-processed message must leave the queue — and nacks
handler failures back (requeue=True) for redelivery. One message's failure
never kills the consumer.
from forze_kits.integrations.consumer import run_consumer
result = await run_consumer(
ctx,
queue="orders", # the channel the relay published to
queue_spec=ORDERS_QUEUE,
handler=handle_order_event, # async def (message: QueueMessage[OrderEvent]) -> None
inbox_spec=ORDERS_INBOX,
tx_route="postgres", # dedup mark + handler commit together here
timeout=timedelta(seconds=5), # idle timeout; None = consume forever
)
# result.processed / result.duplicates / result.parked / result.failed
In production it runs continuously as a lifecycle step — one step per queue (no in-process concurrency knob; scale out with more steps or processes):
from forze_kits.integrations.consumer import queue_consumer_background_lifecycle_step
lifecycle = LifecyclePlan.from_steps(
queue_consumer_background_lifecycle_step(
queue="orders",
queue_spec=ORDERS_QUEUE,
handler=handle_order_event,
inbox_spec=ORDERS_INBOX,
tx_route="postgres",
),
)
A crash of the consume stream itself (broker connection loss) is logged and
the consume restarts after restart_backoff (default 5s); unacked in-flight
messages redeliver and the inbox dedupes them.
Two kinds of poison, two owners:
- Decode-poison (payload doesn't fit the codec model) never reaches your
handler — the queue adapters reject it inside
consumewithnack(requeue=False)(RabbitMQ DLX, SQS redrive) and keep consuming. - Handler-poison (decodes fine, handler always fails) is parked by the
runner when
max_deliveriesis set: a message whosedelivery_countexceeds it isnack(requeue=False)-ed without running the handler, so the handler gets at mostmax_deliveriesattempts.
Parking is opt-in — and needs a delivery count
max_deliveries defaults to None: the broker's own redrive/DLX policy is
the default safety net, and you should configure one. Parking also relies
on the backend reporting QueueMessage.delivery_count (SQS
ApproximateReceiveCount, RabbitMQ x-death approximation, mock exact) —
when it's None, parking never triggers and a poison message keeps
redelivering until the broker's policy catches it.
Transient blips can also be retried in-process before the message goes back to
the broker: pass retry_policy="my-policy" and the runner wraps each process
step (dedup mark + handler, one fresh transaction per attempt) in
ctx.resilience().run(...) under that named policy.
Failures and retries¶
The relay classifies errors by where they arise:
- Poison — the payload can't be decoded into the codec model. The row can
never publish, so it's marked
failedimmediately. Fix the cause, then re-drive withctx.outbox.query(spec).requeue_failed([id])(this resets the retry counter). - Transient — the broker publish call raised. The row is rescheduled with
exponential backoff plus jitter (
retry_base_delay * 2**attempts, capped atretry_max_backoff) and stays invisible to claims until itsavailable_at. Aftermax_attemptspublish attempts it's markedfailed(terminal).
Defaults: max_attempts=5, retry_base_delay=1s, retry_max_backoff=5min —
kw-only on every relay function and on the lifecycle step. One row's failure
never blocks the rest of the batch.
Per-aggregate ordering¶
Stage with an ordering_key (typically the aggregate id) and the relay
publishes it as the transport key instead of the event id:
await ctx.outbox.command(ORDER_EVENTS).stage(
"order.shipped", payload, ordering_key=str(order_id),
)
On transports that honor key for partitioning — SQS FIFO (MessageGroupId),
stream partition keys — same-key events deliver in staged (created_at) order
on the happy path. Events staged without an ordering_key keep
key=str(event_id) as before. Either way the event id rides the
forze_event_id header, which is what consumers dedupe on.
Ordering is expressible, not guaranteed
Delivery is at-least-once and ordering is not guaranteed across
failures/retries: a row rescheduled for retry (or parked as failed) does
not stall later rows of the same ordering_key — deliberately, so one
poison event never head-of-line blocks its aggregate. Consumers must
dedupe on event_id (the forze_event_id header) and tolerate
reordering as well as redelivery (dedupe with the
inbox).
Table schema¶
The outbox table is application-owned. For Postgres
(PostgresOutboxConfig(relation=("app", "outbox"))):
CREATE TABLE app.outbox (
id UUID PRIMARY KEY,
outbox_route TEXT NOT NULL,
event_id UUID NOT NULL,
event_type TEXT NOT NULL,
tenant_id UUID,
execution_id UUID,
correlation_id UUID,
causation_id UUID,
occurred_at TIMESTAMPTZ NOT NULL,
payload JSONB NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
published_at TIMESTAMPTZ,
processing_at TIMESTAMPTZ,
last_error TEXT,
attempts INT NOT NULL DEFAULT 0,
available_at TIMESTAMPTZ,
ordering_key TEXT,
UNIQUE (outbox_route, event_id)
);
-- covers the claim predicate (route + pending + ripe, oldest first)
CREATE INDEX outbox_claim_idx
ON app.outbox (outbox_route, status, available_at, created_at);
attempts is the durable retry counter; available_at schedules the next
retry (NULL = claimable now); ordering_key is the optional delivery
partition key (NULL = no partitioning, key falls back to the event id).
Existing tables migrate with:
ALTER TABLE app.outbox
ADD COLUMN attempts INT NOT NULL DEFAULT 0,
ADD COLUMN available_at TIMESTAMPTZ,
ADD COLUMN ordering_key TEXT;
Causal ordering (Hybrid Logical Clock)¶
By default claims are ordered by created_at — assigned per flush batch, so
rows staged together share a timestamp and tie arbitrarily, and clocks on
different replicas can disagree. Set PostgresOutboxConfig(hlc_ordering=True)
to claim in causal order instead: every event carries a Hybrid Logical
Clock stamp that stays close to wall time yet always exceeds any timestamp the
process has observed (including one merged from a consumed event's forze_hlc
header), so a reaction sorts after its cause across replicas, and the
time-ordered id breaks any remaining tie. Add the column first, then opt in
(legacy NULL-hlc rows fall back to created_at):
ALTER TABLE app.outbox ADD COLUMN hlc BIGINT;
-- claim order becomes (hlc NULLS LAST, created_at, id); index to match
CREATE INDEX outbox_claim_hlc_idx
ON app.outbox (outbox_route, status, available_at, hlc, created_at, id);
For Mongo (MongoOutboxConfig), documents mirror these fields; recommended
indexes:
db.outbox.createIndex({ outbox_route: 1, event_id: 1 }, { unique: true })
db.outbox.createIndex({ outbox_route: 1, status: 1, available_at: 1, created_at: 1 })
db.outbox.createIndex({ outbox_route: 1, status: 1, processing_at: 1 })
// the relay reads each claimed batch back by its claim token
db.outbox.createIndex({ claim_token: 1 }, { sparse: true })
hlc_ordering=True works the same on Mongo: the packed HLC is stored on each
document and the claim sorts [(hlc, 1), (created_at, 1), (id, 1)]. No schema
migration is needed (documents are schemaless), but add an index to match, and
note that Mongo sorts missing-hlc rows first — so during migration legacy
rows drain oldest-first, the inverse of Postgres NULLS LAST (both best-effort):
db.outbox.createIndex({ outbox_route: 1, status: 1, available_at: 1, hlc: 1, created_at: 1, id: 1 })
Notes¶
- Store the outbox where you store the data so the stage shares the
transaction —
PostgresOutboxConfig(relation=("app", "outbox"))(fromforze_postgres.execution.deps.configs) orMongoOutboxConfig. - At-least-once. The relay can publish a row twice (claim, publish, crash before marking). Consumers dedupe with the inbox.
- The background lifecycle step drains the whole backlog each tick (batches
until a short claim, capped at
max_batches_per_tick=100), then sleepsinterval.