Skip to content

CRUD over Postgres

A DocumentSpec plus a PostgresDepsModule is the entire persistence story. The FastAPI routes resolve the document ports from the context and return read models — no SQL, no ORM, and optimistic concurrency for free.

The runnable version lives at examples/recipes/crud_fastapi/just run brings up ephemeral Postgres, serves the API, and tears it down.

The aggregate

A Product with the three write shapes — domain model, create command, and a partial update — plus a read model:

class Product(Document):
    name: str
    price: int


class ProductCreate(CreateDocumentCmd):
    name: str
    price: int


class ProductUpdate(BaseDTO):
    name: str | None = None
    price: int | None = None


class ProductRead(ReadDocument):
    name: str
    price: int

The specification

One spec names the aggregate and its write types. "products" is the logical name shared by the adapter wiring:

PRODUCT_SPEC = DocumentSpec(
    name="products",
    read=ProductRead,
    write=DocumentWriteTypes(
        domain=Product, create_cmd=ProductCreate, update_cmd=ProductUpdate
    ),
)

Wire Postgres

PostgresDocumentConfig maps the spec to its tables; PostgresDepsModule registers the document ports under "products", and the lifecycle module owns the connection pool:

def build_runtime(pg: PostgresClient, *, dsn: str) -> ExecutionRuntime:
    deps = DepsRegistry.from_modules(
        PostgresDepsModule(
            client=pg, rw_documents={"products": PRODUCT_PG}, tx={"products"}
        ),
    )
    lifecycle = LifecyclePlan.from_modules(
        PostgresLifecycleModule(client=pg, dsn=dsn, config=PostgresConfig()),
    )
    return ExecutionRuntime(deps=deps.freeze(), lifecycle=lifecycle.freeze())
The demo table

The example creates its products table on startup so it's self-contained. A real service owns its schema through migrations — Forze reads and writes rows, it doesn't manage DDL. The columns id, rev, created_at, and last_update_at are the document bookkeeping fields.

The routes

The runtime opens inside the app's lifespan; each route resolves the document command or query port from the context and calls it:

@asynccontextmanager
async def lifespan(app: FastAPI):
    pg = PostgresClient()
    dsn = os.environ.get("POSTGRES_DSN", "postgresql://forze:forze@localhost:5432/forze")
    _rt.set_once(build_runtime(pg, dsn=dsn))
    async with _rt.get().scope():
        await pg.execute(SCHEMA)  # demo bootstrap (real apps migrate instead)
        yield


app = FastAPI(title="Products API", lifespan=lifespan)
register_exception_handlers(app)  # CoreException → HTTP (not_found → 404, conflict → 409)


@app.post("/products")
async def create_product(cmd: ProductCreate) -> ProductRead:
    return await ctx().document.command(PRODUCT_SPEC).create(cmd)


@app.get("/products/{product_id}")
async def get_product(product_id: UUID) -> ProductRead:
    return await ctx().document.query(PRODUCT_SPEC).get(product_id)


@app.get("/products")
async def list_products() -> list[ProductRead]:
    page = await ctx().document.query(PRODUCT_SPEC).find_many()
    return list(page.hits)


@app.put("/products/{product_id}")
async def update_product(product_id: UUID, rev: int, patch: ProductUpdate) -> ProductRead:
    c = ctx()
    await c.document.command(PRODUCT_SPEC).update(product_id, rev, patch)
    return await c.document.query(PRODUCT_SPEC).get(product_id)


@app.delete("/products/{product_id}", status_code=204)
async def delete_product(product_id: UUID) -> None:
    await ctx().document.command(PRODUCT_SPEC).kill(product_id)
  • Create / get / list / delete map straight onto the document ports.
  • Update carries the document's rev — a stale rev raises a conflict, which register_exception_handlers turns into a 409. That's optimistic concurrency with no extra code.
  • A missing id raises not_found404.

Hand-writing the routes keeps this recipe transparent; the same endpoints can also be generated from an operation registry with attach_document_routes.

Run it

cd examples/recipes/crud_fastapi
just run

Then open http://localhost:8000/docs.

Where next

  • Cache reads with Redis


    Serve repeat reads from Redis and invalidate on writes — same handlers, one extra module.

  • Add idempotency


    Make a retried POST a no-op that returns the first result.