Socket.IO
forze[socketio] is an inbound transport — like FastAPI, but for
real-time events. It routes Socket.IO commands to Forze operations, validates
payloads, and resolves the execution context per connection.
Install¶
uv add 'forze[socketio]'
No service for a single worker; an optional Redis backplane enables delivery across multiple processes.
Build the server¶
from forze_socketio import build_socketio_server, build_socketio_asgi_app
sio = build_socketio_server() # pass redis_url=... for multi-process
asgi = build_socketio_asgi_app(sio) # or mount alongside a FastAPI app
Route commands to operations¶
Declare commands on a namespace router — each maps an event to an operation, with typed payload and optional ack — then bind them through the adapter:
from forze_socketio import ForzeSocketIOAdapter, SocketIONamespaceRouter
chat = SocketIONamespaceRouter(namespace="/chat")
chat.command(event="message.send", operation="messages.create", payload_type=SendMessage, ack_type=ReadMessage)
adapter = ForzeSocketIOAdapter(
sio=sio,
context_factory=lambda request: runtime.get_context(), # tenant/deps wiring here
operation_resolver=registry.resolve, # the frozen registry
)
adapter.include_router(chat)
On each event the adapter builds the context, validates the payload, runs the operation, and returns the (validated) result as the Socket.IO ack.
Errors and identity¶
Every handler runs inside an error boundary: a CoreException is acked as
{"error": {"detail", "code", "kind", ...}} honoring the same egress policy as
the HTTP boundary — server-side kinds (internal, infrastructure, configuration,
concurrency) and unexpected exceptions are logged and acked with a generic
detail, so internals never leak to clients. An optional identity_resolver
on the adapter authenticates connections at connect time (refusing them via
ConnectionRefusedError when it raises, e.g. exc.authentication) and binds
the resolved AuthnIdentity onto the invocation context around each event;
without one, handlers run unauthenticated and governance hooks that require
identity will deny. Tenant resolution stays in the context_factory.
What it provides¶
| Surface | What it does |
|---|---|
SocketIONamespaceRouter.command(...) |
inbound: event → operation, with typed payload/ack |
SocketIOEventEmitter |
outbound: typed, validated server-to-client emits |
Notes¶
- The registry must be frozen (
OperationRegistry(...).freeze()), same as any transport. operation_resolveris the registry's ownresolve— its signature is(operation_key, context).- Payloads validate through a per-route Pydantic
TypeAdapter; authenticate connections withidentity_resolver, bind tenant in thecontext_factory. - Multi-process delivery needs the Redis backplane (
redis_url=); without it the server is single-worker.