CRUD over Postgres
A DocumentSpec plus a PostgresDepsModule is the entire persistence story.
The FastAPI routes resolve the document ports from the context and return read
models — no SQL, no ORM, and optimistic concurrency for free.
The runnable version lives at examples/recipes/crud_fastapi/ — just run
brings up ephemeral Postgres, serves the API, and tears it down.
The aggregate¶
A Product with the three write shapes — domain model, create command, and a
partial update — plus a read model:
class Product(Document):
name: str
price: int
class ProductCreate(CreateDocumentCmd):
name: str
price: int
class ProductUpdate(BaseDTO):
name: str | None = None
price: int | None = None
class ProductRead(ReadDocument):
name: str
price: int
The specification¶
One spec names the aggregate and its write types. "products" is the logical
name shared by the adapter wiring:
PRODUCT_SPEC = DocumentSpec(
name="products",
read=ProductRead,
write=DocumentWriteTypes(
domain=Product, create_cmd=ProductCreate, update_cmd=ProductUpdate
),
)
Wire Postgres¶
PostgresDocumentConfig maps the spec to its tables; PostgresDepsModule
registers the document ports under "products", and the lifecycle module owns
the connection pool:
def build_runtime(pg: PostgresClient, *, dsn: str) -> ExecutionRuntime:
deps = DepsRegistry.from_modules(
PostgresDepsModule(
client=pg, rw_documents={"products": PRODUCT_PG}, tx={"products"}
),
)
lifecycle = LifecyclePlan.from_modules(
PostgresLifecycleModule(client=pg, dsn=dsn, config=PostgresConfig()),
)
return ExecutionRuntime(deps=deps.freeze(), lifecycle=lifecycle.freeze())
The demo table
The example creates its products table on startup so it's self-contained.
A real service owns its schema through migrations — Forze reads and writes
rows, it doesn't manage DDL. The columns id, rev, created_at, and
last_update_at are the document bookkeeping fields.
The routes¶
The runtime opens inside the app's lifespan; each route resolves the document command or query port from the context and calls it:
@asynccontextmanager
async def lifespan(app: FastAPI):
pg = PostgresClient()
dsn = os.environ.get("POSTGRES_DSN", "postgresql://forze:forze@localhost:5432/forze")
_rt.set_once(build_runtime(pg, dsn=dsn))
async with _rt.get().scope():
await pg.execute(SCHEMA) # demo bootstrap (real apps migrate instead)
yield
app = FastAPI(title="Products API", lifespan=lifespan)
register_exception_handlers(app) # CoreException → HTTP (not_found → 404, conflict → 409)
@app.post("/products")
async def create_product(cmd: ProductCreate) -> ProductRead:
return await ctx().document.command(PRODUCT_SPEC).create(cmd)
@app.get("/products/{product_id}")
async def get_product(product_id: UUID) -> ProductRead:
return await ctx().document.query(PRODUCT_SPEC).get(product_id)
@app.get("/products")
async def list_products() -> list[ProductRead]:
page = await ctx().document.query(PRODUCT_SPEC).find_many()
return list(page.hits)
@app.put("/products/{product_id}")
async def update_product(product_id: UUID, rev: int, patch: ProductUpdate) -> ProductRead:
c = ctx()
await c.document.command(PRODUCT_SPEC).update(product_id, rev, patch)
return await c.document.query(PRODUCT_SPEC).get(product_id)
@app.delete("/products/{product_id}", status_code=204)
async def delete_product(product_id: UUID) -> None:
await ctx().document.command(PRODUCT_SPEC).kill(product_id)
- Create / get / list / delete map straight onto the document ports.
- Update carries the document's
rev— a stalerevraises aconflict, whichregister_exception_handlersturns into a409. That's optimistic concurrency with no extra code. - A missing id raises
not_found→404.
Hand-writing the routes keeps this recipe transparent; the same endpoints can
also be generated from an operation
registry with
attach_document_routes.
Run it¶
cd examples/recipes/crud_fastapi
just run
Then open http://localhost:8000/docs.
Where next¶
-
Serve repeat reads from Redis and invalidate on writes — same handlers, one extra module.
-
Make a retried
POSTa no-op that returns the first result.