Contracts are the protocol interfaces that define what the application needs from infrastructure. Each contract is a Python Protocol class — adapters implement them, and usecases consume them through ExecutionContext. For the architectural rationale, see Contracts and Adapters. This page is the complete API reference.
Structure of a contract
Every infrastructure concern follows the same pattern:
| Component | Role | Example |
|---|---|---|
| Port | Protocol interface defining operations | DocumentReadPort[R] |
| Spec | Declarative configuration for the concern | DocumentSpec[R, D, C, U] |
| DepKey | Typed key for dependency registration | DocumentReadDepKey |
| DepPort | Factory protocol that builds a port from context | DocumentReadDepPort |
| DepRouter | Router that selects a provider by spec | DocumentReadDepRouter |
Ports are resolved at runtime via ExecutionContext, never imported directly from adapter packages.
Dependencies and keys
DepKey
A typed key that identifies a dependency in the container:
from forze.application.contracts.deps import DepKey
MyServiceKey = DepKey[MyService]("my_service")
The name parameter is used in error messages and diagnostics. The type parameter T carries static type information for safe resolution.
DepsPort
Protocol for a dependency container:
| Method | Purpose |
|---|---|
provide(key) |
Return the dependency registered under key |
exists(key) |
Check if a dependency is registered |
merge(*deps) |
Combine multiple containers (raises on key conflicts) |
without(key) |
Return a new container without the given key |
empty() |
Check if the container has no dependencies |
DepRouter
Generic router that selects a dependency provider based on a spec. Used by integration modules to map multiple adapters (e.g. Postgres for some specs, Mongo for others) under a single dep key.
Document storage
Documents are the primary data abstraction. Ports are split into read and write for CQRS flexibility.
DocumentReadPort[R]
Read-only operations for document aggregates:
| Method | Signature | Returns |
|---|---|---|
get |
(pk, *, for_update?, return_fields?) |
R or JsonDict |
get_many |
(pks, *, return_fields?) |
Sequence[R] or Sequence[JsonDict] |
find |
(filters, *, for_update?, return_fields?) |
R \| None or JsonDict \| None |
find_many |
(filters?, limit?, offset?, sorts?, *, return_fields?) |
(list[R], int) or (list[JsonDict], int) |
count |
(filters?) |
int |
When return_fields is provided, methods return JsonDict projections instead of typed models. for_update locks the row when the backend supports it.
DocumentWritePort[R, D, C, U]
Mutation operations for document aggregates:
| Method | Signature | Returns |
|---|---|---|
create |
(dto) |
R |
create_many |
(dtos) |
Sequence[R] |
update |
(pk, dto, *, rev?) |
R |
update_many |
(pks, dtos, *, revs?) |
Sequence[R] |
touch / touch_many |
(pk) / (pks) |
R / Sequence[R] |
kill / kill_many |
(pk) / (pks) |
None |
delete / delete_many |
(pk, *, rev?) / (pks, *, revs?) |
R / Sequence[R] |
restore / restore_many |
(pk, *, rev?) / (pks, *, revs?) |
R / Sequence[R] |
The optional rev parameter enables optimistic concurrency control. When provided, the adapter checks that the current revision matches before applying the change.
DocumentSpec
Declarative specification that binds a document aggregate to its storage, cache, and history:
from forze.application.contracts.document import DocumentSpec
spec = DocumentSpec(
namespace="projects",
read={"source": "public.projects", "model": ProjectRead},
write={
"source": "public.projects",
"models": {
"domain": Project,
"create_cmd": CreateProjectCmd,
"update_cmd": UpdateProjectCmd,
},
},
history={"source": "public.projects_history"},
cache={"enabled": True},
)
| Field | Type | Required | Purpose |
|---|---|---|---|
namespace |
str |
Yes | Logical name and cache key prefix |
read |
DocumentReadSpec[R] |
Yes | Source relation and read model type |
write |
DocumentWriteSpec[D, C, U] |
No | Source relation and write model types |
history |
DocumentHistorySpec |
No | Source relation for revision audit trail |
cache |
DocumentCacheSpec |
No | Cache configuration (enable flag, TTL) |
Helper methods:
supports_soft_delete()—Truewhen the domain model inherits fromSoftDeletionMixinsupports_update()—Truewhen the update command has writable fields
Dependency keys
| Key | Type | Resolved via |
|---|---|---|
DocumentReadDepKey |
DepKey[DocumentReadDepPort] |
ctx.doc_read(spec) |
DocumentWriteDepKey |
DepKey[DocumentWriteDepPort] |
ctx.doc_write(spec) |
Transaction management
TxManagerPort
Manages transaction boundaries:
| Method | Purpose |
|---|---|
transaction() |
Return an async context manager for a transaction scope |
scope_key() |
Return the TxScopeKey identifying this tx manager kind |
TxScopedPort
Marker protocol for ports that are bound to a specific transaction scope. The execution context validates that scoped ports match the active transaction.
TxHandle
Value object holding the active transaction's scope key. Used internally to detect scope mismatches.
Dependency keys
| Key | Resolved via |
|---|---|
TxManagerDepKey |
ctx.txmanager() |
Cache
CacheSpec
from forze.application.contracts.cache import CacheSpec
cache_spec = CacheSpec(namespace="projects", ttl=timedelta(minutes=10))
| Field | Type | Purpose |
|---|---|---|
namespace |
str |
Cache key namespace |
ttl |
timedelta |
Default time-to-live for entries |
CachePort
Combines read and write operations:
| Method | Purpose |
|---|---|
get(pk) |
Retrieve a cached document |
get_many(pks) |
Retrieve multiple cached documents |
set(pk, data) |
Store a document in cache |
invalidate(pk) |
Remove a document from cache |
invalidate_many(pks) |
Remove multiple documents |
Dependency keys
| Key | Resolved via |
|---|---|
CacheDepKey |
ctx.cache(spec) |
Counter
CounterPort
Namespace-scoped atomic counters:
| Method | Signature | Purpose |
|---|---|---|
incr |
(suffix?, by?) |
Increment and return new value |
incr_batch |
(count, suffix?) |
Increment by count, return final value |
decr |
(suffix?, by?) |
Decrement and return new value |
reset |
(suffix?, value?) |
Reset to a specific value |
Dependency keys
| Key | Resolved via |
|---|---|
CounterDepKey |
ctx.counter(namespace) |
Search
SearchSpec
from forze.application.contracts.search import SearchSpec
search_spec = SearchSpec(
namespace="projects",
model=ProjectRead,
indexes={
"idx_title": {
"source": "public.projects",
"fields": [{"path": "title"}],
},
},
default_index="idx_title",
)
| Field | Type | Required | Purpose |
|---|---|---|---|
namespace |
str |
Yes | Logical search domain name |
model |
type[BaseModel] |
Yes | Result model for typed search |
indexes |
dict[str, SearchIndexSpec] |
Yes | Index name to configuration |
default_index |
str |
No | Default index when not specified |
SearchIndexSpec
| Field | Type | Required | Purpose |
|---|---|---|---|
fields |
list[SearchFieldSpec] |
Yes | Fields included in the index |
source |
str |
No | Source relation |
groups |
list[SearchGroupSpec] |
No | Weight groups for ranking |
default_group |
str |
No | Default weight group |
fuzzy |
SearchFuzzySpec |
No | Fuzzy search parameters |
SearchReadPort[R]
| Method | Purpose |
|---|---|
search(query, filters?, limit?, offset?, sorts?, *, options?, return_model?, return_fields?) |
Full-text search with optional filters and pagination |
Dependency keys
| Key | Resolved via |
|---|---|
SearchReadDepKey |
ctx.search(spec) |
Object storage
StoragePort
S3-style blob storage:
| Method | Signature | Returns |
|---|---|---|
upload |
(filename, data, description?, *, prefix?) |
StoredObject |
download |
(key) |
DownloadedObject |
delete |
(key) |
None |
list |
(limit, offset, *, prefix?) |
(list[ObjectMetadata], int) |
Storage types
| Type | Fields |
|---|---|
StoredObject |
key, filename, content_type, size |
ObjectMetadata |
key, filename, content_type, size, last_modified |
DownloadedObject |
key, filename, content_type, size, data |
Dependency keys
| Key | Resolved via |
|---|---|
StorageDepKey |
ctx.storage(bucket) |
Queue
QueueSpec
from forze.application.contracts.queue import QueueSpec
order_queue = QueueSpec(namespace="orders", model=OrderPayload)
QueueReadPort[M]
| Method | Purpose |
|---|---|
receive(queue, *, limit?, timeout?) |
Receive a batch of messages |
consume(queue, *, timeout?) |
Async iterator over messages |
ack(queue, ids) |
Acknowledge processed messages |
nack(queue, ids, *, requeue?) |
Reject messages, optionally re-queue |
QueueWritePort[M]
| Method | Purpose |
|---|---|
enqueue(queue, payload, *, type?, key?, enqueued_at?) |
Send a single message |
enqueue_many(queue, payloads, *, type?, key?, enqueued_at?) |
Send a batch |
QueueMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Message identifier |
payload |
M |
Deserialized message body |
type |
str \| None |
Optional message type |
key |
str \| None |
Optional routing key |
enqueued_at |
datetime \| None |
Enqueue timestamp |
Dependency keys
| Key | Purpose |
|---|---|
QueueReadDepKey |
Read port |
QueueWriteDepKey |
Write port |
Pub/Sub
PubSubSpec
from forze.application.contracts.pubsub import PubSubSpec
events_spec = PubSubSpec(namespace="events", model=EventPayload)
PubSubPublishPort[M]
| Method | Purpose |
|---|---|
publish(topic, payload, *, type?, key?, published_at?) |
Publish a message |
PubSubSubscribePort[M]
| Method | Purpose |
|---|---|
subscribe(topics, *, timeout?) |
Async iterator over messages |
PubSubMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Message identifier |
topic |
str |
Topic the message was published to |
payload |
M |
Deserialized message body |
type |
str \| None |
Optional message type |
key |
str \| None |
Optional routing key |
published_at |
datetime \| None |
Publish timestamp |
Dependency keys
| Key | Purpose |
|---|---|
PubSubPublishDepKey |
Publish port |
PubSubSubscribeDepKey |
Subscribe port |
Stream
StreamSpec
from forze.application.contracts.stream import StreamSpec
audit_stream = StreamSpec(namespace="audit", model=AuditEntry)
StreamReadPort[M]
| Method | Purpose |
|---|---|
read(stream_mapping, *, limit?, timeout?) |
Read entries from streams |
tail(stream_mapping, *, timeout?) |
Async iterator following new entries |
StreamGroupPort[M]
| Method | Purpose |
|---|---|
read(group, consumer, stream_mapping, *, limit?, timeout?) |
Consumer group read |
tail(group, consumer, stream_mapping, *, timeout?) |
Consumer group tail |
ack(group, stream, ids) |
Acknowledge entries |
StreamWritePort[M]
| Method | Purpose |
|---|---|
append(stream, payload, *, type?, key?, timestamp?) |
Append an entry |
StreamMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Entry identifier |
stream |
str |
Stream name |
payload |
M |
Deserialized entry body |
type |
str \| None |
Optional entry type |
key |
str \| None |
Optional routing key |
timestamp |
datetime \| None |
Entry timestamp |
Dependency keys
| Key | Purpose |
|---|---|
StreamReadDepKey |
Read port |
StreamWriteDepKey |
Write port |
StreamGroupDepKey |
Group port |
Idempotency
IdempotencyPort
Deduplicate operations by caching responses keyed by operation name, idempotency key, and payload hash:
| Method | Signature | Purpose |
|---|---|---|
begin |
(op, key, payload_hash) |
Check for a cached response; returns IdempotencySnapshot \| None |
commit |
(op, key, payload_hash, snapshot) |
Store the response for future dedup |
IdempotencySnapshot
| Field | Type | Purpose |
|---|---|---|
status_code |
int |
HTTP status code |
body |
bytes |
Serialized response body |
headers |
dict[str, str] |
Response headers |
Dependency keys
| Key | Purpose |
|---|---|
IdempotencyDepKey |
Idempotency port |
Workflow
WorkflowPort
Orchestrate long-running processes:
| Method | Signature | Purpose |
|---|---|---|
start |
(name, id, args, queue?) |
Start a workflow instance |
signal |
(id, signal, data) |
Send a signal to a running workflow |
Context ports
ActorContextPort
Ambient actor identity for audit trails:
| Method | Purpose |
|---|---|
get() |
Return current actor UUID |
set(actor_id) |
Bind actor for the current context |
TenantContextPort
Ambient tenant identity for multi-tenant routing:
| Method | Purpose |
|---|---|
get() |
Return current tenant UUID |
set(tenant_id) |
Bind tenant for the current context |
Dependency keys
| Key | Purpose |
|---|---|
TenantContextDepKey |
Tenant context port |
Resolving ports
All ports are resolved through ExecutionContext. Contracts with convenience methods:
doc_read = ctx.doc_read(project_spec)
doc_write = ctx.doc_write(project_spec)
cache = ctx.cache(cache_spec)
counter = ctx.counter("tickets")
storage = ctx.storage("attachments")
search = ctx.search(search_spec)
tx = ctx.txmanager()
For contracts without a convenience method, use dep() with the dep key:
from forze.application.contracts.pubsub import PubSubPublishDepKey
publisher = ctx.dep(PubSubPublishDepKey)(ctx, events_spec)
await publisher.publish("events.created", payload)