forze_s3 provides blob storage backed by any S3-compatible service: Amazon S3, MinIO, Yandex Object Storage, DigitalOcean Spaces, and others. It implements StoragePort using aioboto3.
Installation
uv add 'forze[s3]'
Runtime wiring
from forze.application.execution import DepsPlan, ExecutionRuntime, LifecyclePlan
from forze_s3 import S3Client, S3Config, S3DepsModule, s3_lifecycle_step
client = S3Client()
module = S3DepsModule(client=client)
runtime = ExecutionRuntime(
deps=DepsPlan.from_modules(module),
lifecycle=LifecyclePlan.from_steps(
s3_lifecycle_step(
endpoint="https://s3.amazonaws.com",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
)
),
)
MinIO configuration
For local development with MinIO:
s3_lifecycle_step(
endpoint="http://localhost:9000",
access_key_id="minioadmin",
secret_access_key="minioadmin",
)
S3Config options
Optional tuning can be passed via S3Config. It accepts botocore Config-compatible parameters for timeouts, retries, and connection pooling.
What gets registered
| Key | Capability |
|---|---|
S3ClientDepKey |
Raw S3 client for direct operations |
StorageDepKey |
Storage port adapter factory |
Using the storage port
Resolve the storage port by bucket name from the execution context:
storage = ctx.storage("app-assets")
Upload
stored = await storage.upload(
filename="invoice.pdf",
data=pdf_bytes,
description="Invoice #42",
prefix="invoices/2026/03",
)
# stored.key -> "invoices/2026/03/<uuid>/invoice.pdf"
# stored.filename -> "invoice.pdf"
# stored.size -> len(pdf_bytes)
The adapter generates a unique key based on the prefix and a UUID to prevent collisions. Content type is detected automatically using python-magic.
Download
downloaded = await storage.download(stored.key)
# downloaded.data -> bytes
# downloaded.content_type -> "application/pdf"
# downloaded.filename -> "invoice.pdf"
Delete
await storage.delete(stored.key)
List
objects, total = await storage.list(
limit=20,
offset=0,
prefix="invoices/2026",
)
for obj in objects:
print(f"{obj.key} ({obj.size} bytes)")
Operation reference
| Method | Returns | Purpose |
|---|---|---|
upload(filename, data, description?, *, prefix?) |
StoredObject |
Upload bytes and return metadata |
download(key) |
DownloadedObject |
Download previously stored object |
delete(key) |
None |
Delete an object by key |
list(limit, offset, *, prefix?) |
(list[StoredObject], int) |
Paginated listing with optional prefix filter |
StoredObject fields
| Field | Type | Description |
|---|---|---|
key |
str |
Unique object key in the bucket |
filename |
str |
Original filename |
content_type |
str |
Detected MIME type |
size |
int |
Size in bytes |
created_at |
datetime |
Upload timestamp |
DownloadedObject fields
| Field | Type | Description |
|---|---|---|
data |
bytes |
Raw object bytes |
content_type |
str |
MIME type |
filename |
str |
Original filename |
Multi-tenant behavior
When a TenantContextPort is registered in the dependency container, the S3 storage adapter automatically incorporates the tenant ID into object key paths. This provides natural tenant isolation at the storage level without additional configuration.
Object keys become: {tenant_id}/{prefix}/{uuid}/{filename}
Scope of the integration
Forze handles:
- Resolving
StoragePortby bucket name viactx.storage("bucket") - Upload, download, delete, and list operations
- Content-type detection and metadata tracking
- Multi-tenant key isolation
Forze does not manage:
- Bucket creation, IAM policies, or ACLs
- Lifecycle rules (retention, archival, transitions)
- CORS configuration or encryption settings
- Pre-signed URLs or direct client uploads
These are infrastructure concerns handled by your cloud provider or IaC tooling.
Combining with other modules
S3 is typically combined with Postgres and Redis in a full stack:
deps_plan = DepsPlan.from_modules(
lambda: Deps.merge(
PostgresDepsModule(client=pg, rev_bump_strategy="database", history_write_strategy="database")(),
RedisDepsModule(client=redis)(),
S3DepsModule(client=s3)(),
),
)
lifecycle = LifecyclePlan.from_steps(
postgres_lifecycle_step(dsn="postgresql://...", config=PostgresConfig()),
redis_lifecycle_step(dsn="redis://...", config=RedisConfig()),
s3_lifecycle_step(endpoint="http://localhost:9000", access_key_id="minioadmin", secret_access_key="minioadmin"),
)
Use the storage port in usecases alongside document and search ports:
class UploadProjectAttachment(Usecase[UploadArgs, StoredObject]):
async def main(self, args: UploadArgs) -> StoredObject:
doc = self.ctx.doc_read(project_spec)
await doc.get(args.project_id) # validate project exists
storage = self.ctx.storage("project-attachments")
return await storage.upload(
filename=args.filename,
data=args.data,
prefix=f"projects/{args.project_id}",
)