Skip to content

Socket.IO

forze[socketio] is an inbound transport — like FastAPI, but for real-time events. It routes Socket.IO commands to Forze operations, validates payloads, and resolves the execution context per connection.

Install

uv add 'forze[socketio]'

No service for a single worker; an optional Redis backplane enables delivery across multiple processes.

Build the server

from forze_socketio import build_socketio_server, build_socketio_asgi_app

sio = build_socketio_server()            # pass redis_url=... for multi-process
asgi = build_socketio_asgi_app(sio)      # or mount alongside a FastAPI app

Route commands to operations

Declare commands on a namespace router — each maps an event to an operation, with typed payload and optional ack — then bind them through the adapter:

from forze_socketio import ForzeSocketIOAdapter, SocketIONamespaceRouter

chat = SocketIONamespaceRouter(namespace="/chat")
chat.command(event="message.send", operation="messages.create", payload_type=SendMessage, ack_type=ReadMessage)

adapter = ForzeSocketIOAdapter(
    sio=sio,
    context_factory=lambda request: runtime.get_context(),  # tenant/deps wiring here
    operation_resolver=registry.resolve,                     # the frozen registry
)
adapter.include_router(chat)

On each event the adapter builds the context, validates the payload, runs the operation, and returns the (validated) result as the Socket.IO ack.

Errors and identity

Every handler runs inside an error boundary: a CoreException is acked as {"error": {"detail", "code", "kind", ...}} honoring the same egress policy as the HTTP boundary — server-side kinds (internal, infrastructure, configuration, concurrency) and unexpected exceptions are logged and acked with a generic detail, so internals never leak to clients. An optional identity_resolver on the adapter authenticates connections at connect time (refusing them via ConnectionRefusedError when it raises, e.g. exc.authentication) and binds the resolved AuthnIdentity onto the invocation context around each event; without one, handlers run unauthenticated and governance hooks that require identity will deny. Tenant resolution stays in the context_factory.

What it provides

Surface What it does
SocketIONamespaceRouter.command(...) inbound: event → operation, with typed payload/ack
SocketIOEventEmitter outbound: typed, validated server-to-client emits

Notes

  • The registry must be frozen (OperationRegistry(...).freeze()), same as any transport.
  • operation_resolver is the registry's own resolve — its signature is (operation_key, context).
  • Payloads validate through a per-route Pydantic TypeAdapter; authenticate connections with identity_resolver, bind tenant in the context_factory.
  • Multi-process delivery needs the Redis backplane (redis_url=); without it the server is single-worker.