Skip to content

Queue, outbox & inbox ports

Queue

The queue contract has no short ctx.* accessor — resolve the ports by dep key:

from forze.application.contracts.queue import QueueQueryDepKey, QueueCommandDepKey

w = ctx.deps.resolve_configurable(ctx, QueueCommandDepKey, spec, route=spec.name)
r = ctx.deps.resolve_configurable(ctx, QueueQueryDepKey, spec, route=spec.name)

Command port

Method Signature
enqueue enqueue(queue, payload, *, type=None, key=None, enqueued_at=None, delay=None, not_before=None) → message id
enqueue_many enqueue_many(queue, payloads, *, type=None, key=None, enqueued_at=None, delay=None, not_before=None)

delay (a timedelta) and not_before (a tz-aware datetime) are mutually exclusive — see Scheduled & delayed jobs.

Query port

Method Signature Notes
receive receive(queue, *, limit=None, timeout=None) a batch of QueueMessage
consume consume(queue, *, timeout=None) async generator until timeout
ack ack(queue, ids) acknowledge; returns count
nack nack(queue, ids, *, requeue=True) negative-ack, optionally requeue

Outbox

ctx.outbox.command(spec) stages and flushes; ctx.outbox.query(spec) drives the relay. See Transactional outbox.

Command port

Method Signature Notes
stage stage(event_type, payload, *, event_id=None, occurred_at=None, ordering_key=None) buffer one event; ordering_key partitions delivery on capable transports
stage_many stage_many(events, *, event_ids=None) buffer (event_type, payload) pairs
stage_event stage_event(event) buffer a built IntegrationEvent
flush flush() persist buffered events; returns rows inserted

Query port

Method Signature Notes
claim_pending claim_pending(*, limit=None) claim a batch for relay; skips rows whose available_at is in the future
mark_published mark_published(ids) mark relayed
mark_failed mark_failed(ids, *, error=None) mark terminally failed (operator re-drives)
mark_retry mark_retry(ids, *, attempts, available_at, error=None) reschedule for a future retry with the durable attempt counter
reclaim_stale_processing reclaim_stale_processing(*, older_than) reset stuck rows to pending
requeue_failed requeue_failed(ids) re-drive failed rows; resets attempts to 0

(Most apps use the relay_outbox_to_queue kit rather than these directly.) Delivery is at-least-once and ordering is not guaranteed across failures/retries — a retrying row never stalls later rows of its ordering_key. Staging an ordering_key makes same-key events partition together (SQS FIFO MessageGroupId, stream partition key) and relay in created_at order on the happy path; consumers still dedupe on event_id (the forze_event_id header) and tolerate reordering.

Inbox

ctx.inbox(spec) returns an InboxPort with a single method — the consumer-side exactly-once primitive:

Method Signature Notes
mark_if_unseen mark_if_unseen(inbox, message_id) True if newly recorded (process), False if already seen (skip)

Call it inside the handler's transaction so the dedup mark and the handler's writes commit together. In practice use the process_with_inbox kit, which does exactly that — see Events & sagas.