forze_s3 provides blob storage backed by any S3-compatible service: Amazon S3, MinIO, Yandex Object Storage, DigitalOcean Spaces, and others. It implements StoragePort using aioboto3.

Installation

uv add 'forze[s3]'

Runtime wiring

from forze.application.execution import DepsPlan, ExecutionRuntime, LifecyclePlan
from forze_s3 import S3Client, S3Config, S3DepsModule, s3_lifecycle_step

client = S3Client()
module = S3DepsModule(client=client)

runtime = ExecutionRuntime(
    deps=DepsPlan.from_modules(module),
    lifecycle=LifecyclePlan.from_steps(
        s3_lifecycle_step(
            endpoint="https://s3.amazonaws.com",
            access_key_id="your-access-key",
            secret_access_key="your-secret-key",
        )
    ),
)

MinIO configuration

For local development with MinIO:

s3_lifecycle_step(
    endpoint="http://localhost:9000",
    access_key_id="minioadmin",
    secret_access_key="minioadmin",
)

S3Config options

Optional tuning can be passed via S3Config. It accepts botocore Config-compatible parameters for timeouts, retries, and connection pooling.

What gets registered

Key Capability
S3ClientDepKey Raw S3 client for direct operations
StorageDepKey Storage port adapter factory

Using the storage port

Resolve the storage port by bucket name from the execution context:

storage = ctx.storage("app-assets")

Upload

stored = await storage.upload(
    filename="invoice.pdf",
    data=pdf_bytes,
    description="Invoice #42",
    prefix="invoices/2026/03",
)

# stored.key      -> "invoices/2026/03/<uuid>/invoice.pdf"
# stored.filename -> "invoice.pdf"
# stored.size     -> len(pdf_bytes)

The adapter generates a unique key based on the prefix and a UUID to prevent collisions. Content type is detected automatically using python-magic.

Download

downloaded = await storage.download(stored.key)

# downloaded.data         -> bytes
# downloaded.content_type -> "application/pdf"
# downloaded.filename     -> "invoice.pdf"

Delete

await storage.delete(stored.key)

List

objects, total = await storage.list(
    limit=20,
    offset=0,
    prefix="invoices/2026",
)

for obj in objects:
    print(f"{obj.key} ({obj.size} bytes)")

Operation reference

Method Returns Purpose
upload(filename, data, description?, *, prefix?) StoredObject Upload bytes and return metadata
download(key) DownloadedObject Download previously stored object
delete(key) None Delete an object by key
list(limit, offset, *, prefix?) (list[StoredObject], int) Paginated listing with optional prefix filter

StoredObject fields

Field Type Description
key str Unique object key in the bucket
filename str Original filename
content_type str Detected MIME type
size int Size in bytes
created_at datetime Upload timestamp

DownloadedObject fields

Field Type Description
data bytes Raw object bytes
content_type str MIME type
filename str Original filename

Multi-tenant behavior

When a TenantContextPort is registered in the dependency container, the S3 storage adapter automatically incorporates the tenant ID into object key paths. This provides natural tenant isolation at the storage level without additional configuration.

Object keys become: {tenant_id}/{prefix}/{uuid}/{filename}

Scope of the integration

Forze handles:

  • Resolving StoragePort by bucket name via ctx.storage("bucket")
  • Upload, download, delete, and list operations
  • Content-type detection and metadata tracking
  • Multi-tenant key isolation

Forze does not manage:

  • Bucket creation, IAM policies, or ACLs
  • Lifecycle rules (retention, archival, transitions)
  • CORS configuration or encryption settings
  • Pre-signed URLs or direct client uploads

These are infrastructure concerns handled by your cloud provider or IaC tooling.

Combining with other modules

S3 is typically combined with Postgres and Redis in a full stack:

deps_plan = DepsPlan.from_modules(
    lambda: Deps.merge(
        PostgresDepsModule(client=pg, rev_bump_strategy="database", history_write_strategy="database")(),
        RedisDepsModule(client=redis)(),
        S3DepsModule(client=s3)(),
    ),
)

lifecycle = LifecyclePlan.from_steps(
    postgres_lifecycle_step(dsn="postgresql://...", config=PostgresConfig()),
    redis_lifecycle_step(dsn="redis://...", config=RedisConfig()),
    s3_lifecycle_step(endpoint="http://localhost:9000", access_key_id="minioadmin", secret_access_key="minioadmin"),
)

Use the storage port in usecases alongside document and search ports:

class UploadProjectAttachment(Usecase[UploadArgs, StoredObject]):
    async def main(self, args: UploadArgs) -> StoredObject:
        doc = self.ctx.doc_read(project_spec)
        await doc.get(args.project_id)  # validate project exists

        storage = self.ctx.storage("project-attachments")
        return await storage.upload(
            filename=args.filename,
            data=args.data,
            prefix=f"projects/{args.project_id}",
        )