Contracts are the protocol interfaces that define what the application needs from infrastructure. Each contract is a Python Protocol class — adapters implement them, and usecases consume them through ExecutionContext. For the architectural rationale, see Contracts and Adapters. This page is the complete API reference.
Structure of a contract
Every infrastructure concern follows the same pattern:
| Component | Role | Example |
|---|---|---|
| Port | Protocol interface defining operations | DocumentQueryPort[R] |
| Spec | Declarative configuration for the concern | DocumentSpec[R, D, C, U] |
| DepKey | Typed key for dependency registration | DocumentQueryDepKey |
| DepPort | Factory protocol that builds a port from context | DocumentQueryDepPort |
| Routed deps | Same dep key, multiple providers keyed by spec.name |
Deps.routed({...}) |
Ports are resolved at runtime via ExecutionContext, never imported directly from adapter packages.
Dependencies and keys
DepKey
A typed key that identifies a dependency in the container:
from forze.application.contracts.base import DepKey
MyServiceKey = DepKey[MyService]("my_service")
The name parameter is used in error messages and diagnostics. The type parameter T carries static type information for safe resolution.
DepsPort
Protocol for a dependency container:
| Method | Purpose |
|---|---|
provide(key) |
Return the dependency registered under key |
exists(key) |
Check if a dependency is registered |
merge(*deps) |
Combine multiple containers (raises on key conflicts) |
without(key) |
Return a new container without the given key |
empty() |
Check if the container has no dependencies |
Routed dependency selection is built into Deps itself and resolved via provide(key, route=...).
Document storage
Documents are the primary data abstraction. Ports are split into read and write for CQRS flexibility.
DocumentQueryPort[R]
Read-only operations for document aggregates:
| Method | Signature | Returns |
|---|---|---|
get |
(pk, *, for_update?, return_fields?) |
R or JsonDict |
get_many |
(pks, *, return_fields?) |
Sequence[R] or Sequence[JsonDict] |
find |
(filters, *, for_update?, return_fields?) |
R \| None or JsonDict \| None |
find_many |
(filters?, limit?, offset?, sorts?, *, return_fields?) |
(list[R], int) or (list[JsonDict], int) |
count |
(filters?) |
int |
When return_fields is provided, methods return JsonDict projections instead of typed models. for_update locks the row when the backend supports it.
DocumentCommandPort[R, D, C, U]
Mutation operations for document aggregates:
| Method | Signature | Returns |
|---|---|---|
create |
(dto) |
R |
create_many |
(dtos) |
Sequence[R] |
update |
(pk, dto, *, rev?) |
R |
update_many |
(pks, dtos, *, revs?) |
Sequence[R] |
touch / touch_many |
(pk) / (pks) |
R / Sequence[R] |
kill / kill_many |
(pk) / (pks) |
None |
delete / delete_many |
(pk, *, rev?) / (pks, *, revs?) |
R / Sequence[R] |
restore / restore_many |
(pk, *, rev?) / (pks, *, revs?) |
R / Sequence[R] |
The optional rev parameter enables optimistic concurrency control. When provided, the adapter checks that the current revision matches before applying the change.
DocumentSpec
Kernel specification: model types, logical name, optional history_enabled, optional CacheSpec. Physical tables and collections are configured in PostgresDepsModule / MongoDepsModule (see Specs and infrastructure wiring).
from datetime import timedelta
from forze.application.contracts.cache import CacheSpec
from forze.application.contracts.document import DocumentSpec
spec = DocumentSpec(
name="projects",
read=ProjectRead,
write={
"domain": Project,
"create_cmd": CreateProjectCmd,
"update_cmd": UpdateProjectCmd,
},
history_enabled=True,
cache=CacheSpec(name="projects", ttl=timedelta(minutes=5)),
)
| Field | Type | Purpose |
|---|---|---|
name |
str |
Logical route — matches infra config keys |
read |
type[R] |
Read model (ReadDocument) |
write |
DocumentWriteTypes \| None |
Domain + commands, or None for read-only |
history_enabled |
bool |
Whether history is active when infra provides it |
cache |
CacheSpec \| None |
Enables read-through cache on query/command ports |
Helper methods:
supports_soft_delete()—Truewhen the domain model inherits fromSoftDeletionMixinsupports_update()—Truewhen the update command has writable fields
Dependency keys
| Key | Resolved via |
|---|---|
DocumentQueryDepKey |
ctx.doc_query(spec) |
DocumentCommandDepKey |
ctx.doc_command(spec) |
Transaction management
TxManagerPort
Manages transaction boundaries:
| Method | Purpose |
|---|---|
transaction() |
Return an async context manager for a transaction scope |
scope_key() |
Return the TxScopeKey identifying this tx manager kind |
TxScopedPort
Marker protocol for ports that are bound to a specific transaction scope. The execution context validates that scoped ports match the active transaction.
TxHandle
Value object holding the active transaction's scope key. Used internally to detect scope mismatches.
Dependency keys
| Key | Resolved via |
|---|---|
TxManagerDepKey |
ctx.txmanager() |
Cache
CacheSpec
from forze.application.contracts.cache import CacheSpec
cache_spec = CacheSpec(name="projects", ttl=timedelta(minutes=10))
| Field | Type | Purpose |
|---|---|---|
name |
str |
Route to RedisDepsModule.caches[name] (or other cache backend) |
ttl |
timedelta |
Default time-to-live for entries |
ttl_pointer |
timedelta |
TTL for version pointer keys when using versioned cache |
CachePort
Combines read and write operations:
| Method | Purpose |
|---|---|
get(pk) |
Retrieve a cached document |
get_many(pks) |
Retrieve multiple cached documents |
set(pk, data) |
Store a document in cache |
invalidate(pk) |
Remove a document from cache |
invalidate_many(pks) |
Remove multiple documents |
Dependency keys
| Key | Resolved via |
|---|---|
CacheDepKey |
ctx.cache(spec) |
Counter
CounterPort
Namespace-scoped atomic counters:
| Method | Signature | Purpose |
|---|---|---|
incr |
(suffix?, by?) |
Increment and return new value |
incr_batch |
(count, suffix?) |
Increment by count, return final value |
decr |
(suffix?, by?) |
Decrement and return new value |
reset |
(suffix?, value?) |
Reset to a specific value |
Dependency keys
| Key | Resolved via |
|---|---|
CounterDepKey |
ctx.counter(CounterSpec(...)) |
Search
SearchSpec
from forze.application.contracts.search import SearchSpec
search_spec = SearchSpec(
name="projects",
model_type=ProjectReadModel,
fields=("title", "description"),
default_weights={"title": 0.6, "description": 0.4},
)
Postgres index and heap names belong in PostgresDepsModule.searches[name] (PostgresSearchConfig), not on the kernel spec.
| Field | Purpose |
|---|---|
name |
Logical route — matches PostgresSearchConfig registration |
model_type |
Pydantic model for typed hits |
fields |
Indexed field names (unique) |
default_weights |
Optional per-field weights (must cover all fields if set) |
fuzzy |
Optional SearchFuzzySpec |
SearchQueryPort[R]
| Method | Purpose |
|---|---|
search(query, filters?, limit?, offset?, sorts?, *, options?, return_model?, return_fields?) |
Full-text search with optional filters and pagination |
Dependency keys
| Key | Resolved via |
|---|---|
SearchQueryDepKey |
ctx.search_query(spec) |
Object storage
StoragePort
S3-style blob storage:
| Method | Signature | Returns |
|---|---|---|
upload |
(filename, data, description?, *, prefix?) |
StoredObject |
download |
(key) |
DownloadedObject |
delete |
(key) |
None |
list |
(limit, offset, *, prefix?) |
(list[ObjectMetadata], int) |
Storage types
| Type | Fields |
|---|---|
StoredObject |
key, filename, content_type, size |
ObjectMetadata |
key, filename, content_type, size, last_modified |
DownloadedObject |
key, filename, content_type, size, data |
Dependency keys
| Key | Resolved via |
|---|---|
StorageDepKey |
ctx.storage(StorageSpec(name=...)) |
Queue
QueueSpec
from forze.application.contracts.queue import QueueSpec
order_queue = QueueSpec(name="orders", model=OrderPayload)
QueueReadPort[M]
| Method | Purpose |
|---|---|
receive(queue, *, limit?, timeout?) |
Receive a batch of messages |
consume(queue, *, timeout?) |
Async iterator over messages |
ack(queue, ids) |
Acknowledge processed messages |
nack(queue, ids, *, requeue?) |
Reject messages, optionally re-queue |
QueueWritePort[M]
| Method | Purpose |
|---|---|
enqueue(queue, payload, *, type?, key?, enqueued_at?) |
Send a single message |
enqueue_many(queue, payloads, *, type?, key?, enqueued_at?) |
Send a batch |
QueueMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Message identifier |
payload |
M |
Deserialized message body |
type |
str \| None |
Optional message type |
key |
str \| None |
Optional routing key |
enqueued_at |
datetime \| None |
Enqueue timestamp |
Dependency keys
| Key | Purpose |
|---|---|
QueueReadDepKey |
Read port |
QueueWriteDepKey |
Write port |
Pub/Sub
PubSubSpec
from forze.application.contracts.pubsub import PubSubSpec
events_spec = PubSubSpec(name="events", model=EventPayload)
PubSubPublishPort[M]
| Method | Purpose |
|---|---|
publish(topic, payload, *, type?, key?, published_at?) |
Publish a message |
PubSubSubscribePort[M]
| Method | Purpose |
|---|---|
subscribe(topics, *, timeout?) |
Async iterator over messages |
PubSubMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Message identifier |
topic |
str |
Topic the message was published to |
payload |
M |
Deserialized message body |
type |
str \| None |
Optional message type |
key |
str \| None |
Optional routing key |
published_at |
datetime \| None |
Publish timestamp |
Dependency keys
| Key | Purpose |
|---|---|
PubSubPublishDepKey |
Publish port |
PubSubSubscribeDepKey |
Subscribe port |
Stream
StreamSpec
from forze.application.contracts.stream import StreamSpec
audit_stream = StreamSpec(name="audit", model=AuditEntry)
StreamReadPort[M]
| Method | Purpose |
|---|---|
read(stream_mapping, *, limit?, timeout?) |
Read entries from streams |
tail(stream_mapping, *, timeout?) |
Async iterator following new entries |
StreamGroupPort[M]
| Method | Purpose |
|---|---|
read(group, consumer, stream_mapping, *, limit?, timeout?) |
Consumer group read |
tail(group, consumer, stream_mapping, *, timeout?) |
Consumer group tail |
ack(group, stream, ids) |
Acknowledge entries |
StreamWritePort[M]
| Method | Purpose |
|---|---|
append(stream, payload, *, type?, key?, timestamp?) |
Append an entry |
StreamMessage[M]
| Field | Type | Purpose |
|---|---|---|
id |
str |
Entry identifier |
stream |
str |
Stream name |
payload |
M |
Deserialized entry body |
type |
str \| None |
Optional entry type |
key |
str \| None |
Optional routing key |
timestamp |
datetime \| None |
Entry timestamp |
Dependency keys
| Key | Purpose |
|---|---|
StreamReadDepKey |
Read port |
StreamWriteDepKey |
Write port |
StreamGroupDepKey |
Group port |
Idempotency
IdempotencyPort
Deduplicate operations by caching responses keyed by operation name, idempotency key, and payload hash:
| Method | Signature | Purpose |
|---|---|---|
begin |
(op, key, payload_hash) |
Check for a cached response; returns IdempotencySnapshot \| None |
commit |
(op, key, payload_hash, snapshot) |
Store the response for future dedup |
IdempotencySnapshot
| Field | Type | Purpose |
|---|---|---|
status_code |
int |
HTTP status code |
body |
bytes |
Serialized response body |
headers |
dict[str, str] |
Response headers |
Dependency keys
| Key | Purpose |
|---|---|
IdempotencyDepKey |
Idempotency port |
Workflow
Workflows are typed with WorkflowSpec (logical name, run invocation, optional signals, queries, updates). Ports are split between commands and queries.
WorkflowCommandPort
| Method | Purpose |
|---|---|
start(args, *, workflow_id?, raise_on_already_started?) |
Start a run; returns WorkflowHandle |
signal(handle, *, signal, args) |
Send a signal |
update(handle, *, update, args) |
Run a workflow update |
cancel(handle) |
Request cancellation |
terminate(handle, *, reason?) |
Terminate the run |
WorkflowQueryPort
| Method | Purpose |
|---|---|
query(handle, *, query, args) |
Run a query |
result(handle) |
Await the workflow result |
Dependency keys
| Key | Purpose |
|---|---|
WorkflowCommandDepKey |
Routed factory → WorkflowCommandPort (route = WorkflowSpec.name) |
WorkflowQueryDepKey |
Routed factory → WorkflowQueryPort (route = WorkflowSpec.name) |
Context handling
Execution identity is represented by CallContext and optional AuthIdentity on ExecutionContext.
AuthIdentity carries subject_id plus optional tenant_id, actor_id, claims, roles, and permissions; bind at the boundary via ctx.bind_call(..., identity=...).
forze_auth provides a document-backed auth provider around these contracts:
DocumentAuthSpec, DocumentAuthDepsModule, and adapters for authentication,
authorization, token lifecycle, and API-key lifecycle. The provider uses regular
document ports, so storage is selected by the existing document adapter wiring.
Resolving ports
All ports are resolved through ExecutionContext. Contracts with convenience methods:
from forze.application.contracts.counter import CounterSpec
from forze.application.contracts.storage import StorageSpec
doc_q = ctx.doc_query(project_spec)
doc_c = ctx.doc_command(project_spec)
cache = ctx.cache(cache_spec)
counter = ctx.counter(CounterSpec(name="tickets"))
storage = ctx.storage(StorageSpec(name="attachments"))
search = ctx.search_query(search_spec)
tx = ctx.txmanager("default")
For contracts without a convenience method, use dep() with the dep key:
from forze.application.contracts.pubsub import PubSubPublishDepKey
publisher = ctx.dep(PubSubPublishDepKey)(ctx, events_spec)
await publisher.publish("events.created", payload)