The middleware system wraps usecases with cross-cutting behavior: authorization, transaction boundaries, logging, side effects. Plans and registries provide a declarative way to compose these into per-operation chains. For the conceptual overview, see Application Layer and Usecase Composition.
Usecase
Usecase[Args, R] is the base class for all application usecases. It receives an ExecutionContext and implements business logic in main():
from forze.application.execution import Usecase
class GetProject(Usecase[UUID, ProjectRead]):
async def main(self, args: UUID) -> ProjectRead:
doc = self.ctx.doc_read(project_spec)
return await doc.get(args)
Invoke a usecase via __call__(), which runs the full middleware chain:
get_project = GetProject(ctx=ctx)
project = await get_project(some_id)
| Attribute | Type | Purpose |
|---|---|---|
ctx |
ExecutionContext |
Execution context for resolving ports |
middlewares |
tuple[Middleware, ...] |
Middleware chain wrapping the usecase |
with_middlewares(*mw) returns a new usecase instance with additional middlewares appended.
Middleware protocols
Three protocol types describe different positions in the chain:
Guard
Runs before the usecase. Raises to abort the chain:
from forze.application.execution import Guard
class RequireAuth(Guard[UUID]):
async def __call__(self, args: UUID) -> None:
if not is_authenticated():
raise ValidationError("Not authenticated")
Effect
Runs after the usecase returns. May inspect or transform the result:
from forze.application.execution import Effect
class LogCreation(Effect[CreateProjectCmd, ProjectRead]):
async def __call__(
self, args: CreateProjectCmd, res: ProjectRead
) -> ProjectRead:
logger.info("Created project %s", res.id)
return res
Middleware
Full control over the chain — receives next and args:
from forze.application.execution import Middleware, NextCall
class TimingMiddleware(Middleware[Any, Any]):
async def __call__(self, next: NextCall[Any, Any], args: Any) -> Any:
start = time.monotonic()
result = await next(args)
logger.info("%.3fs", time.monotonic() - start)
return result
NextCall[Args, R] is a type alias for Callable[[Args], Awaitable[R]].
Built-in middleware implementations
| Class | Purpose |
|---|---|
GuardMiddleware[Args, R] |
Wraps a Guard: runs it before next(args) |
EffectMiddleware[Args, R] |
Wraps an Effect: runs it after next(args) |
TxMiddleware[Args, R] |
Wraps next inside ctx.transaction(); supports after-commit effects |
TxMiddleware
Wraps the usecase in a transaction. After a successful commit, runs registered after-commit effects:
from forze.application.execution import TxMiddleware
tx_mw = TxMiddleware(ctx=ctx)
tx_mw = tx_mw.with_after_commit(notify_effect, publish_effect)
After-commit effects run outside the transaction boundary, making them suitable for notifications, event publishing, and other side effects that should not roll back.
Middleware chain execution
When a usecase is invoked, the chain builds from the middleware tuple (outermost first):
GuardMiddleware(auth)
→ TxMiddleware(ctx)
→ GuardMiddleware(lock)
→ EffectMiddleware(audit)
→ main(args)
Each middleware calls next(args) to proceed or raises to abort.
UsecasePlan
Declares how each operation is composed with middleware. Maps operation keys to middleware buckets:
from forze.application.execution import UsecasePlan
plan = (
UsecasePlan()
.tx("create")
.tx("update")
.before("create", auth_guard_factory, priority=100)
.after("create", log_effect_factory, priority=0)
.after_commit("create", notify_factory)
.before("*", rate_limit_factory, priority=200)
)
Plan buckets
Each operation has seven middleware buckets, executed in this order:
| Bucket | When it runs | Typical use |
|---|---|---|
outer_before |
Before everything | Authorization, rate limiting |
outer_wrap |
Wraps the entire chain | Metrics, retries, error handling |
| Transaction boundary | Automatic when tx=True |
|
in_tx_before |
Inside tx, before usecase | Lock acquisition, pre-checks |
in_tx_wrap |
Inside tx, wraps usecase | In-transaction cross-cutting |
in_tx_after |
Inside tx, after usecase | Audit logging inside tx |
outer_after |
After everything | Response transformation |
after_commit |
After successful commit | Notifications, event publishing |
The in_tx_* and after_commit buckets only activate when tx(op) has been called for the operation.
Wildcard
The "*" wildcard applies to all operations as a base plan. Per-operation plans extend the base:
plan = (
UsecasePlan()
.before("*", global_guard, priority=1000) # applies to all ops
.before("create", create_guard, priority=100) # only for create
)
When an operation is resolved, the wildcard plan and the operation-specific plan are merged.
Priority ordering
Within a bucket, middlewares are sorted by priority (descending). Higher priority runs first (outermost). Priority values must be unique within a bucket:
plan = (
UsecasePlan()
.before("create", rate_limit, priority=200) # runs first
.before("create", auth_guard, priority=100) # runs second
)
Plan methods
| Method | Signature | Purpose |
|---|---|---|
tx(op) |
(OpKey) -> Self |
Enable transaction wrapping |
before(op, guard, *, priority=0) |
(OpKey, GuardFactory, priority) -> Self |
Add to outer_before |
after(op, effect, *, priority=0) |
(OpKey, EffectFactory, priority) -> Self |
Add to outer_after |
wrap(op, mw, *, priority=0) |
(OpKey, MiddlewareFactory, priority) -> Self |
Add to outer_wrap |
in_tx_before(op, guard, *, priority=0) |
(OpKey, GuardFactory, priority) -> Self |
Add to in_tx_before |
in_tx_after(op, effect, *, priority=0) |
(OpKey, EffectFactory, priority) -> Self |
Add to in_tx_after |
in_tx_wrap(op, mw, *, priority=0) |
(OpKey, MiddlewareFactory, priority) -> Self |
Add to in_tx_wrap |
after_commit(op, effect, *, priority=0) |
(OpKey, EffectFactory, priority) -> Self |
Add to after_commit |
Factory types:
| Type | Signature |
|---|---|
GuardFactory |
(ExecutionContext) -> Guard[Any] |
EffectFactory |
(ExecutionContext) -> Effect[Any, Any] |
MiddlewareFactory |
(ExecutionContext) -> Middleware[Any, Any] |
Merging plans
Multiple plans can be merged for modular composition:
base_plan = tx_document_plan
auth_plan = build_auth_plan()
audit_plan = build_audit_plan()
final_plan = UsecasePlan.merge(base_plan, auth_plan, audit_plan)
Per-operation buckets are concatenated. When resolved, duplicates within a bucket are deduplicated by factory identity and priority.
Resolving a plan
resolve() builds a fully composed usecase for an operation:
usecase = plan.resolve("create", ctx, lambda ctx: CreateProject(ctx=ctx))
The method merges the wildcard and operation-specific plans, validates (e.g. in-tx buckets require tx), builds the ordered middleware chain, and wraps the factory result.
Inspecting a plan
Use explain() to debug the middleware chain for an operation:
explanation = plan.explain("create")
print(explanation.pretty_format())
Output includes bucket names, priorities, factory references, and factory IDs.
UsecaseRegistry
Maps operation keys to usecase factories. A factory receives ExecutionContext and returns a Usecase:
from forze.application.execution import UsecaseRegistry
registry = UsecaseRegistry()
registry = registry.register("get", lambda ctx: GetProject(ctx=ctx))
registry = registry.register("create", lambda ctx: CreateProject(ctx=ctx))
Registration methods
| Method | Purpose |
|---|---|
register(op, factory, *, inplace=False) |
Register a factory; raises if op exists |
register_many(ops, *, inplace=False) |
Register multiple factories at once |
override(op, factory, *, inplace=False) |
Override an existing factory; raises if op missing |
override_many(ops, *, inplace=False) |
Override multiple factories |
exists(op) |
Check if a factory is registered |
When inplace=True, the registry is mutated; otherwise a new instance is returned.
Extending with plans
registry.extend_plan(auth_plan, inplace=True)
extend_plan() merges a UsecasePlan into the registry's internal plan.
Resolving usecases
usecase = registry.resolve("create", ctx)
result = await usecase(CreateProjectCmd(title="New"))
resolve() looks up the factory, builds the middleware chain from the plan, and returns a composed usecase. Pass debug_plan=True to print the chain to stdout.
OperationPlan
Internal building block for UsecasePlan. Each operation maps to an OperationPlan with per-bucket middleware specs:
from forze.application.execution.plan import OperationPlan, MiddlewareSpec
op_plan = OperationPlan(tx=True)
op_plan = op_plan.add("outer_before", MiddlewareSpec(priority=100, factory=my_factory))
| Method | Purpose |
|---|---|
add(bucket, spec) |
Add a middleware spec to a bucket |
build(bucket) |
Return deduplicated, priority-sorted specs for a bucket |
validate() |
Ensure in-tx buckets are only used when tx is enabled |
merge(*plans) |
Combine multiple operation plans |
You typically interact with UsecasePlan rather than OperationPlan directly.