The execution engine manages dependency injection, context creation, and application lifecycle. It connects domain models and contracts to infrastructure adapters at runtime. For the conceptual overview, see Application Layer.

ExecutionContext

The central dependency resolution point. Every usecase and factory receives an ExecutionContext to resolve infrastructure ports:

from forze.application.execution import ExecutionContext

doc = ctx.doc_read(project_spec)
result = await doc.get(some_id)

Resolution methods

Method Returns Purpose
dep(key) T Resolve any dependency by typed key
doc_read(spec) DocumentReadPort Read-only document port
doc_write(spec) DocumentWritePort Read-write document port
cache(spec) CachePort Cache port for a namespace
counter(namespace) CounterPort Namespace-scoped counter
txmanager() TxManagerPort Transaction manager
storage(bucket) StoragePort Object storage for a bucket
search(spec) SearchReadPort Full-text search port

When a DocumentSpec has cache.enabled = True, doc_read() and doc_write() automatically resolve and inject a cache adapter. The TTL defaults to 300 seconds unless overridden in cache.ttl.

Transactions

transaction() returns an async context manager that scopes a transaction:

async with ctx.transaction():
    doc = ctx.doc_write(project_spec)
    await doc.create(CreateProjectCmd(title="New"))
    await doc.create(CreateProjectCmd(title="Another"))
    # Both creates commit or roll back together

Nested calls reuse the same transaction. Savepoints are used when the backend supports them:

async with ctx.transaction():
    # outer transaction
    async with ctx.transaction():
        # nested: same transaction, savepoint

active_tx() returns the current TxHandle or None when no transaction is active. The context also validates that ports resolved inside a transaction match the active transaction scope — mixing different transaction managers (e.g. Postgres and Mongo) raises CoreError.

Cycle detection

dep() tracks the resolution stack per async task. If a dependency resolution chain encounters the same DepKey twice, it raises CoreError with the full cycle chain for diagnostics.

Dependencies

DepKey

A typed key identifying a dependency. Used for both registration (in dep modules) and resolution (via ctx.dep(key)):

from forze.application.contracts.deps import DepKey

MyClientKey = DepKey[MyClient]("my_client")

Deps

In-memory dependency container implementing DepsPort:

from forze.application.execution import Deps

deps = Deps(deps={
    DocumentReadDepKey: my_doc_read_factory,
    CacheDepKey: my_cache_factory,
})
Method Purpose
provide(key) Return the dependency; raises CoreError if missing
exists(key) Check registration
merge(*deps) Combine containers; raises CoreError on key conflicts
without(key) Return a container without the given key
empty() Check if the container is empty

Deps.merge() catches misconfigured plans early by failing on duplicate keys.

DepsModule

Protocol for a callable that produces a Deps container. Integration packages expose modules that register their adapters:

from forze.application.execution import Deps, DepsModule

def postgres_module() -> Deps:
    return Deps(deps={
        DocumentReadDepKey: pg_doc_read_factory,
        DocumentWriteDepKey: pg_doc_write_factory,
        TxManagerDepKey: pg_tx_factory,
    })

DepsPlan

Declarative plan that collects DepsModule callables and merges them into a single Deps on build:

from forze.application.execution import DepsPlan

plan = DepsPlan.from_modules(
    postgres_module,
    redis_module,
)

# Or build incrementally
plan = DepsPlan.from_modules(base_module)
plan = plan.with_modules(cache_module, search_module)
Method Purpose
from_modules(*modules) Create a plan from modules
with_modules(*modules) Return a new plan with additional modules
build() Invoke all modules and merge into a single Deps

When build() is called, each module callable is invoked and the results are merged via Deps.merge().

Lifecycle

LifecycleHook

Protocol for startup/shutdown hooks. Receives the ExecutionContext:

async def startup_postgres(ctx: ExecutionContext) -> None:
    client = ctx.dep(PostgresClientKey)
    await client.connect()

async def shutdown_postgres(ctx: ExecutionContext) -> None:
    client = ctx.dep(PostgresClientKey)
    await client.disconnect()

LifecycleStep

Named pair of startup and shutdown hooks:

from forze.application.execution import LifecycleStep

pg_step = LifecycleStep(
    name="postgres",
    startup=startup_postgres,
    shutdown=shutdown_postgres,
)
Field Type Default Purpose
name str Unique name for collision detection
startup LifecycleHook no-op Hook to run on startup
shutdown LifecycleHook no-op Hook to run on shutdown

Integration packages typically provide factory functions that return pre-configured steps.

LifecyclePlan

Ordered sequence of lifecycle steps:

from forze.application.execution import LifecyclePlan

lifecycle = LifecyclePlan.from_steps(pg_step, redis_step)
lifecycle = lifecycle.with_steps(s3_step)
Method Purpose
from_steps(*steps) Create a plan; raises on name collisions
with_steps(*steps) Append steps; raises on name collisions
startup(ctx) Run startup hooks in order
shutdown(ctx) Run shutdown hooks in reverse order

Startup behavior: if a hook fails, all previously started steps are shut down in reverse order before re-raising. Shutdown behavior: exceptions are swallowed so all steps are attempted.

ExecutionRuntime

Combines the dependency plan, lifecycle plan, and context into a scoped runtime:

from forze.application.execution import ExecutionRuntime

runtime = ExecutionRuntime(
    deps=deps_plan,
    lifecycle=lifecycle_plan,
)

Use scope() as an async context manager:

async with runtime.scope():
    ctx = runtime.get_context()
    # Application runs here

Scope lifecycle

  1. Create context — build Deps from the deps plan, store in a RuntimeVar
  2. Startup — run all lifecycle startup hooks in order
  3. Yield — the application runs
  4. Shutdown — run all lifecycle shutdown hooks in reverse order
  5. Reset — clear the context

Methods

Method Purpose
get_context() Return the current ExecutionContext; raises if not in scope
create_context() Build and store the context (idempotent within a scope)
startup() Run lifecycle startup hooks
shutdown() Run lifecycle shutdown hooks and reset context
scope() Async context manager combining all of the above

The runtime is typically created once at application startup (e.g. in a FastAPI lifespan) and shared across requests:

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with runtime.scope():
        yield

app = FastAPI(lifespan=lifespan)

Putting it together

A complete wiring example showing deps, lifecycle, and runtime:

from forze.application.execution import (
    Deps,
    DepsPlan,
    ExecutionRuntime,
    LifecyclePlan,
    LifecycleStep,
)

# 1. Define dep modules
def infra_module() -> Deps:
    return Deps.merge(
        postgres_deps_module(),
        redis_deps_module(),
    )

# 2. Build plans
deps_plan = DepsPlan.from_modules(infra_module)

lifecycle_plan = LifecyclePlan.from_steps(
    LifecycleStep(
        name="postgres",
        startup=pg_startup,
        shutdown=pg_shutdown,
    ),
    LifecycleStep(
        name="redis",
        startup=redis_startup,
        shutdown=redis_shutdown,
    ),
)

# 3. Create runtime
runtime = ExecutionRuntime(
    deps=deps_plan,
    lifecycle=lifecycle_plan,
)

# 4. Use in application
async with runtime.scope():
    ctx = runtime.get_context()
    doc = ctx.doc_read(project_spec)
    projects = await doc.find_many(limit=10)