forze_socketio connects Forze usecases to real-time Socket.IO events. It provides typed command routing, usecase dispatch through ExecutionContext, typed server event emission, and optional Redis backplane support for distributed deployments.
Installation
uv add 'forze[socketio]'
Building a server
Create an async Socket.IO server with build_socketio_server. Optionally enable the Redis manager for multi-process clustering:
from forze_socketio import build_socketio_server
sio = build_socketio_server(
async_mode="asgi",
cors_allowed_origins="*",
redis_url="redis://localhost:6379/0", # optional, for distributed setups
)
Without redis_url, the server uses in-memory transport (single-process only).
Command event routing
Define namespace routes as typed command events. Each event validates input against a Pydantic model, builds an ExecutionContext, resolves the usecase, executes it, and returns a validated acknowledgement payload.
Define DTOs
from pydantic import BaseModel
class JoinRoomCmd(BaseModel):
room: str
user_id: str
class JoinRoomAck(BaseModel):
room: str
joined: bool
Create a usecase
from forze.application.execution import Usecase
class JoinRoom(Usecase[JoinRoomCmd, JoinRoomAck]):
async def main(self, args: JoinRoomCmd) -> JoinRoomAck:
# Business logic here
return JoinRoomAck(room=args.room, joined=True)
Register and route
from forze.application.execution import Deps, ExecutionContext, UsecaseRegistry
from forze_socketio import (
ForzeSocketIOAdapter,
SocketIONamespaceRouter,
make_registry_usecase_resolver,
)
registry = UsecaseRegistry().register(
"chat.join",
lambda ctx: JoinRoom(ctx=ctx),
)
router = SocketIONamespaceRouter(namespace="/chat").command(
event="join",
operation="chat.join",
payload_type=JoinRoomCmd,
ack_type=JoinRoomAck,
)
def context_factory(request) -> ExecutionContext:
return ExecutionContext(deps=Deps())
adapter = ForzeSocketIOAdapter(
sio=sio,
context_factory=context_factory,
usecase_resolver=make_registry_usecase_resolver(registry),
)
adapter.include_router(router)
The adapter handles:
- Deserializing the incoming event payload into
JoinRoomCmd - Calling
context_factoryto create a request-scopedExecutionContext - Resolving
chat.joinfrom the registry with the context - Executing the usecase and serializing the result as
JoinRoomAck - Returning the acknowledgement to the client
Multiple namespaces
Register multiple routers for different namespaces:
chat_router = SocketIONamespaceRouter(namespace="/chat").command(
event="join", operation="chat.join",
payload_type=JoinRoomCmd, ack_type=JoinRoomAck,
).command(
event="leave", operation="chat.leave",
payload_type=LeaveRoomCmd, ack_type=LeaveRoomAck,
)
tasks_router = SocketIONamespaceRouter(namespace="/tasks").command(
event="start", operation="tasks.start",
payload_type=StartTaskCmd, ack_type=StartTaskAck,
)
adapter.include_router(chat_router)
adapter.include_router(tasks_router)
Server event emission
Use SocketIOEventEmitter to send typed server events. The emitter validates payloads against Pydantic models before sending, ensuring type safety for outbound events.
Define a server event
from forze_socketio import SocketIOEventEmitter, SocketIOServerEvent
class ProgressPayload(BaseModel):
done: int
total: int
progress_event = SocketIOServerEvent(
event="task.progress",
namespace="/tasks",
payload_type=ProgressPayload,
)
Emit
emitter = SocketIOEventEmitter(sio=sio)
await emitter.emit(
progress_event,
ProgressPayload(done=3, total=10),
room="task:42",
)
The emitter can target specific rooms, sessions (SIDs), or broadcast to all connected clients.
ASGI integration
Wrap the Socket.IO server as an ASGI application. This allows running alongside FastAPI or other ASGI frameworks:
from forze_socketio import build_socketio_asgi_app
app = build_socketio_asgi_app(sio, other_asgi_app=fastapi_app)
Combined FastAPI + Socket.IO
import uvicorn
from fastapi import FastAPI
from forze_socketio import build_socketio_server, build_socketio_asgi_app
fastapi_app = FastAPI(title="My App")
sio = build_socketio_server(async_mode="asgi")
# Set up Socket.IO routing (as shown above)
# ...
# Combine into a single ASGI app
app = build_socketio_asgi_app(sio, other_asgi_app=fastapi_app)
uvicorn.run(app, host="0.0.0.0", port=8000)
HTTP requests go to FastAPI. WebSocket connections for Socket.IO are handled by the sio server. Both share the same ASGI process.
Context factory
The context factory creates a request-scoped ExecutionContext for each incoming Socket.IO event. In production, you typically wire it to the same runtime used by FastAPI:
def context_factory(request) -> ExecutionContext:
return runtime.get_context()
The request parameter carries Socket.IO metadata (SID, namespace, event name) that you can use for tenant/actor resolution or logging.
Usecase resolver
The default make_registry_usecase_resolver(registry) looks up usecases by operation key in a UsecaseRegistry. For custom resolution logic (e.g. dynamic registration, permission checks), implement your own resolver:
def custom_resolver(ctx: ExecutionContext, operation: str):
if operation.startswith("admin."):
check_admin_permissions(ctx)
return registry.resolve(operation, ctx)
adapter = ForzeSocketIOAdapter(
sio=sio,
context_factory=context_factory,
usecase_resolver=custom_resolver,
)