forze_postgres provides document storage, full-text search, and transaction management backed by PostgreSQL. It implements DocumentQueryPort, DocumentCommandPort, SearchQueryPort, and TxManagerPort using async psycopg with connection pooling.
Kernel specs (DocumentSpec, SearchSpec) describe models and logical names. Table and index locations are supplied separately via PostgresDepsModule (rw_documents, searches, …). See Specs and infrastructure wiring.
Installation
uv add 'forze[postgres]'
Requires PostgreSQL 14 or later.
Runtime wiring
Create a client, pass per-aggregate Postgres configs into PostgresDepsModule, and add a lifecycle step for pool management:
from forze.application.execution import Deps, DepsPlan, ExecutionRuntime, LifecyclePlan
from forze_postgres import (
PostgresClient,
PostgresConfig,
PostgresDepsModule,
postgres_lifecycle_step,
)
client = PostgresClient()
module = PostgresDepsModule(
client=client,
rw_documents={
"projects": {
"read": ("public", "projects"),
"write": ("public", "projects"),
"bookkeeping_strategy": "database",
"history": ("public", "projects_history"),
},
},
searches={
"projects": {
"engine": "pgroonga",
"index": ("public", "idx_projects_content"),
"source": ("public", "projects"),
},
},
tx={"default"},
)
runtime = ExecutionRuntime(
deps=DepsPlan.from_modules(module),
lifecycle=LifecyclePlan.from_steps(
postgres_lifecycle_step(
dsn="postgresql://user:pass@localhost:5432/mydb",
config=PostgresConfig(min_size=2, max_size=15),
)
),
)
Keys in rw_documents, ro_documents, and searches must match DocumentSpec.name and SearchSpec.name on the kernel side.
PostgresConfig options
| Option | Type | Default | Purpose |
|---|---|---|---|
min_size |
int |
2 |
Minimum connections in the pool |
max_size |
int |
10 |
Maximum connections in the pool |
timeout |
float |
30.0 |
Connection acquisition timeout (seconds) |
What gets registered
PostgresDepsModule registers:
| Key | Capability |
|---|---|
PostgresClientDepKey |
Async psycopg client (pool) |
PostgresIntrospectorDepKey |
Catalog introspection (search, types) |
DocumentQueryDepKey |
Routed document query factories (ConfigurablePostgresReadOnlyDocument / derived read side) |
DocumentCommandDepKey |
Routed document command factories (ConfigurablePostgresDocument) |
SearchQueryDepKey |
Routed search factories (ConfigurablePostgresSearch) |
TxManagerDepKey |
Transaction managers per route in tx |
DocumentSpec vs Postgres config
DocumentSpec carries read model type, write model types, history_enabled, and optional CacheSpec. It does not contain SQL identifiers.
PostgresDocumentConfig supplies:
| Field | Purpose |
|---|---|
read |
(schema, relation) for reads (table, view, or materialized view) |
write |
(schema, table) for mutations |
history |
Optional (schema, table) when history is stored in Postgres |
bookkeeping_strategy |
"database" or "application" — who bumps rev and timestamps |
batch_size |
Optional write batch size (default 200) |
tenant_aware |
Optional multi-tenant column handling |
Read-only documents use ro_documents with PostgresReadOnlyDocumentConfig (read only).
Document table schema
Every document table must include core columns matching the domain model fields.
Required columns
| Column | Type | Default | Purpose |
|---|---|---|---|
id |
uuid |
gen_random_uuid() |
Primary key |
rev |
integer |
1 |
Revision counter |
created_at |
timestamptz |
now() |
Creation timestamp |
last_update_at |
timestamptz |
now() |
Last update timestamp |
Optional columns (by mixin)
| Column | Type | Mixin | Purpose |
|---|---|---|---|
is_deleted |
boolean |
SoftDeletionMixin |
Soft delete flag |
number_id |
bigint |
NumberMixin |
Human-readable sequence |
creator_id |
uuid |
CreatorMixin |
Creator reference |
tenant_id |
uuid |
Multi-tenancy | Tenant partition |
Add domain-specific columns as needed. Column names must match Pydantic model field names (snake_case).
Example DDL
CREATE TABLE public.projects (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
rev integer NOT NULL DEFAULT 1,
created_at timestamptz NOT NULL DEFAULT now(),
last_update_at timestamptz NOT NULL DEFAULT now(),
is_deleted boolean NOT NULL DEFAULT false,
title text NOT NULL,
description text NOT NULL
);
Revision strategy
bookkeeping_strategy on PostgresDocumentConfig controls how rev and last_update_at advance:
| Strategy | Behavior |
|---|---|
"database" |
Prefer triggers (or DB defaults) to bump rev and touch timestamps |
"application" |
Adapter supplies the next rev in the update path |
Database trigger for revision bumping
When using "database" bookkeeping with triggers, for example:
CREATE OR REPLACE FUNCTION bump_rev()
RETURNS TRIGGER AS $$
BEGIN
NEW.rev := OLD.rev + 1;
NEW.last_update_at := now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER projects_bump_rev
BEFORE UPDATE ON public.projects
FOR EACH ROW EXECUTE FUNCTION bump_rev();
History table (audit trail)
Set history_enabled=True on DocumentSpec and provide history in PostgresDocumentConfig when you use a history table. If history is missing while history_enabled is true (or the reverse), the adapter logs a warning and skips the history gateway.
History table schema
| Column | Type | Purpose |
|---|---|---|
source |
text |
Source relation name (e.g. public.projects) |
id |
uuid |
Document identifier |
rev |
integer |
Revision number |
data |
jsonb |
Full document snapshot at that revision |
Example DDL
CREATE TABLE public.projects_history (
source text NOT NULL,
id uuid NOT NULL,
rev integer NOT NULL,
data jsonb NOT NULL,
PRIMARY KEY (source, id, rev)
);
CREATE INDEX idx_projects_history_lookup
ON public.projects_history (source, id, rev);
History writes
Use either database triggers (copy OLD row before update) or application-level inserts, consistent with bookkeeping_strategy and your operational preferences.
CREATE OR REPLACE FUNCTION write_document_history()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO public.projects_history (source, id, rev, data)
VALUES ('public.projects', OLD.id, OLD.rev, to_jsonb(OLD));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER projects_history_trigger
BEFORE UPDATE ON public.projects
FOR EACH ROW EXECUTE FUNCTION write_document_history();
Transactions
The Postgres adapter uses psycopg async connections with context-variable scoping. Within an ExecutionContext.transaction() scope, document operations share the same connection when resolved through the Postgres tx manager route you registered in PostgresDepsModule.tx.
Nested transaction() calls create savepoints:
async with ctx.transaction("default"):
await doc_c.create(cmd_1)
async with ctx.transaction("default"):
await doc_c.create(cmd_2)
If the inner block raises, only the savepoint is rolled back.
Document operations
Resolve ports from ExecutionContext using the same DocumentSpec you use in the kernel:
doc_q = ctx.doc_query(project_spec)
doc_c = ctx.doc_command(project_spec)
project = await doc_q.get(project_id)
projects, total = await doc_q.find_many(
filters={"$fields": {"is_deleted": False}},
sorts={"created_at": "desc"},
limit=20,
offset=0,
)
count = await doc_q.count({"$fields": {"is_deleted": False}})
created = await doc_c.create(CreateProjectCmd(title="New", description="..."))
updated = await doc_c.update(project_id, UpdateProjectCmd(title="Updated"), rev=1)
await doc_c.delete(project_id)
await doc_c.restore(project_id)
await doc_c.kill(project_id)
The adapter handles revision checks, cache coordination when DocumentSpec.cache is set, history when configured, and query rendering via the shared query DSL.
Full-text search
The search stack has two layers:
SearchSpec(kernel) —name,model_type,fields, weights, optionalfuzzyPostgresSearchConfig—engine("pgroonga"or"fts"),indexandsourceas(schema, name)tuples, optionalfts_groupsfor native FTS
| Engine | Extension | Best for |
|---|---|---|
| PGroonga | pgroonga |
CJK, fuzzy, array expressions |
| Native FTS | Built-in | GIN + tsvector |
Example kernel + infra
from forze.application.contracts.search import SearchSpec
project_search = SearchSpec(
name="projects",
model_type=ProjectReadModel,
fields=("title", "description"),
default_weights={"title": 0.6, "description": 0.4},
)
# PostgresDepsModule.searches["projects"] must match project_search.name
searches={
"projects": {
"engine": "fts",
"index": ("public", "idx_projects_fts"),
"source": ("public", "projects"),
"fts_groups": {
"A": ("title",),
"B": ("description",),
},
},
}
For "fts", every field in SearchSpec.fields must appear in fts_groups. For "pgroonga", fts_groups is omitted.
PGroonga indexes
CREATE EXTENSION IF NOT EXISTS pgroonga;
CREATE INDEX idx_projects_title ON public.projects
USING pgroonga (title pgroonga_text_full_text_search_ops);
Native FTS (GIN + tsvector)
ALTER TABLE public.projects
ADD COLUMN doc_tsv tsvector
GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A')
|| setweight(to_tsvector('english', coalesce(description, '')), 'B')
) STORED;
CREATE INDEX idx_projects_fts ON public.projects USING gin (doc_tsv);
Using the search port
search = ctx.search_query(project_search)
hits, total = await search.search(
query="roadmap",
filters={"$fields": {"is_deleted": False}},
limit=20,
offset=0,
)
See Query Syntax for filter and sort expressions.
Combining with Redis
Typical stack: Postgres for persistence, Redis for cache and idempotency. Use the same logical names on CacheSpec / DocumentSpec and in RedisDepsModule.caches:
deps_plan = DepsPlan.from_modules(
lambda: Deps.merge(
PostgresDepsModule(client=pg, rw_documents={...}, searches={...})(),
RedisDepsModule(
client=redis,
caches={"projects": {"namespace": "app:projects"}},
)(),
),
)
lifecycle = LifecyclePlan.from_steps(
postgres_lifecycle_step(dsn="postgresql://...", config=PostgresConfig()),
redis_lifecycle_step(dsn="redis://...", config=RedisConfig()),
)
When DocumentSpec.cache is set, doc_query / doc_command resolve a cache port for that CacheSpec.name and pass it into the Postgres adapter.