Analytics over a data lake
Sometimes you need real analytical queries — group-bys, aggregates, scans over millions of rows — but the data already lives as files in a bucket and the latency budget is generous. Standing up a warehouse for that is overkill. DuckDB runs the query in your process, straight over the Parquet, so there's nothing to provision.
The handler never learns any of this. It asks for a named query and gets typed rows back; whether they came from DuckDB-over-S3 or a warehouse is wiring.
The runnable version lives at examples/recipes/analytics_duckdb/ and runs fully
in-process over a local Parquet file — no Docker.
Declare the query surface¶
An AnalyticsSpec is the whole handler-facing contract: a named query, its
params model, and the read model rows come back as. It says nothing about DuckDB,
Parquet, or where the data lives.
class RegionTotal(BaseModel):
region: str
total: int
class SalesQuery(BaseModel):
min_total: int = 0
# The spec is the whole handler-facing surface: a named query + its params + read model.
# It says nothing about DuckDB, Parquet, or where the data lives.
SALES_SPEC = AnalyticsSpec[RegionTotal, None]( # type: ignore[reportUnknownReturnType]
name="sales",
read=RegionTotal,
queries={"by_region": AnalyticsQueryDefinition(params=SalesQuery)},
)
Map it to DuckDB¶
The physical mapping lives below the line — the SQL for each query_key, against
a source registered at startup. Params bind by name ($min_total):
# The physical mapping lives below the line: DuckDB SQL per query_key, against a
# source view registered at startup. `$min_total` binds from the params model.
SALES_CONFIG = DuckDbAnalyticsConfig(
queries={
"by_region": DuckDbQueryConfig(
sql=(
"SELECT region, sum(total) AS total FROM sales "
"GROUP BY region HAVING sum(total) >= $min_total ORDER BY region"
),
),
},
)
Point it at the lake¶
The lifecycle step opens the engine and registers the lake source as a view. Here
the source is a local Parquet file; in production it's
ParquetSource("s3://bucket/sales/*.parquet") with
object-storage credentials resolved
from your secrets backend:
def build_runtime(parquet: Path) -> ExecutionRuntime:
client = DuckDbClient()
# The lifecycle step opens the engine and registers the lake source as a view;
# in production `ParquetSource("s3://bucket/sales/*.parquet")` + S3 credentials.
return ExecutionRuntime(
deps=DepsRegistry.from_modules(
DuckDbDepsModule(client=client, analytics={"sales": SALES_CONFIG})
).freeze(),
lifecycle=LifecyclePlan.from_steps(
duckdb_lifecycle_step(
extensions=(), # local file: no httpfs needed
sources={"sales": ParquetSource(str(parquet))},
),
).freeze(),
)
Run the query¶
From a handler, resolve the port off the context and run the named query — typed rows out, engine-agnostic:
async def top_regions(ctx: ExecutionContext, min_total: int) -> list[RegionTotal]:
# The handler names the query and gets typed rows back — engine-agnostic.
page = await ctx.analytics.query(SALES_SPEC).run(
"by_region", SalesQuery(min_total=min_total)
)
return list(page.hits)
Notes¶
- Query-only. DuckDB here reads the lake; it doesn't write or maintain tables. Producing the Parquet (and Iceberg/Delta compaction) is a separate pipeline.
- Both source styles work. Register named views (
sources=) and reference them in SQL, or inlineread_parquet('s3://…')in a query. - It's a great test double. Because the engine is in-process, the same spec can run against a tiny local Parquet fixture in a Docker-free unit test.
- Pagination is available via
run_page(with a total) andrun_cursor; for large scansrun_chunkedstreams batches.