forze_rabbitmq provides message queue adapters backed by RabbitMQ. It implements QueueReadPort and QueueWritePort using aio-pika (an async AMQP client). The adapter uses work queue semantics with durable queues, persistent messages, and manual acknowledgement.
Installation
uv add 'forze[rabbitmq]'
Runtime wiring
Create a client, register it via the dependency module, and add a lifecycle step for connection management:
from forze.application.execution import Deps, DepsPlan, ExecutionRuntime, LifecyclePlan
from forze_rabbitmq import (
RabbitMQClient,
RabbitMQConfig,
RabbitMQDepsModule,
rabbitmq_lifecycle_step,
)
client = RabbitMQClient()
module = RabbitMQDepsModule(client=client)
runtime = ExecutionRuntime(
deps=DepsPlan.from_modules(module),
lifecycle=LifecyclePlan.from_steps(
rabbitmq_lifecycle_step(
dsn="amqp://guest:guest@localhost:5672/",
config=RabbitMQConfig(
heartbeat=60,
prefetch_count=100,
),
)
),
)
RabbitMQConfig options
| Option | Type | Default | Purpose |
|---|---|---|---|
heartbeat |
int |
60 |
AMQP heartbeat interval (seconds) |
connect_timeout |
float \| None |
5.0 |
Connection timeout (seconds) |
queue_durable |
bool |
True |
Declare queues as durable |
persistent_messages |
bool |
True |
Use persistent delivery mode |
publisher_confirms |
bool |
True |
Enable publisher confirms |
prefetch_count |
int |
100 |
Consumer prefetch limit |
What gets registered
RabbitMQDepsModule registers these dependency keys:
| Key | Capability |
|---|---|
RabbitMQClientDepKey |
Raw RabbitMQ client for direct operations |
QueueReadDepKey |
Queue read adapter factory |
QueueWriteDepKey |
Queue write adapter factory |
Queue specification
A QueueSpec binds a queue namespace to a Pydantic message model:
from pydantic import BaseModel
from forze.application.contracts.queue import QueueSpec
class TaskPayload(BaseModel):
task_id: str
action: str
params: dict
task_queue = QueueSpec(namespace="tasks", model=TaskPayload)
Producing messages
Resolve the write port and send messages:
from forze.application.contracts.queue import QueueWriteDepKey
writer = ctx.dep(QueueWriteDepKey)(ctx, task_queue)
# Send a single message
message_id = await writer.enqueue(
"tasks",
TaskPayload(task_id="t-001", action="process", params={"retry": 3}),
type="task.dispatch",
)
# Send a batch
ids = await writer.enqueue_many(
"tasks",
[
TaskPayload(task_id="t-002", action="index", params={}),
TaskPayload(task_id="t-003", action="notify", params={}),
],
type="task.dispatch",
)
Message attributes
| Parameter | Purpose | RabbitMQ mapping |
|---|---|---|
type |
Message type/category | AMQP type property |
key |
Routing or partition key | forze_key header |
enqueued_at |
Message timestamp | AMQP timestamp property |
Messages are sent with content_type="application/json" and persistent delivery mode by default.
Consuming messages
Receive a batch
from forze.application.contracts.queue import QueueReadDepKey
reader = ctx.dep(QueueReadDepKey)(ctx, task_queue)
messages = await reader.receive("tasks", limit=10)
for msg in messages:
print(f"Task: {msg['payload'].task_id} -> {msg['payload'].action}")
# Acknowledge processed messages
await reader.ack("tasks", [msg["id"] for msg in messages])
Continuous consumption
async for msg in reader.consume("tasks"):
try:
await handle_task(msg["payload"])
await reader.ack("tasks", [msg["id"]])
except Exception:
await reader.nack("tasks", [msg["id"]], requeue=True)
The consume() method returns an async iterator that continuously polls the queue. It uses basic_get under the hood with the configured timeout for each poll cycle.
Acknowledgement
# Acknowledge (remove from queue)
await reader.ack("tasks", [msg["id"]])
# Negative acknowledge with requeue (make available again)
await reader.nack("tasks", [msg["id"]], requeue=True)
# Negative acknowledge without requeue (discard or route to DLX)
await reader.nack("tasks", [msg["id"]], requeue=False)
The adapter tracks pending messages internally using delivery tags. Each message ID is unique within the client's lifetime and maps to the underlying AMQP message for ack/nack operations.
QueueMessage fields
Each message is a QueueMessage[M] TypedDict:
| Field | Type | Description |
|---|---|---|
queue |
str |
Queue name |
id |
str |
Internal message identifier (delivery tag based) |
payload |
M |
Deserialized Pydantic model |
type |
str \| None |
AMQP message type property |
enqueued_at |
datetime \| None |
AMQP timestamp property |
key |
str \| None |
Value from forze_key header |
Using in usecases
Producer usecase
from forze.application.contracts.queue import QueueWriteDepKey
from forze.application.execution import Usecase
class DispatchTask(Usecase[TaskPayload, str]):
async def main(self, args: TaskPayload) -> str:
writer = self.ctx.dep(QueueWriteDepKey)(self.ctx, task_queue)
return await writer.enqueue("tasks", args, type="task.dispatch")
Consumer usecase
from forze.application.contracts.queue import QueueReadDepKey
class TaskWorker(Usecase[None, None]):
async def main(self, args: None) -> None:
reader = self.ctx.dep(QueueReadDepKey)(self.ctx, task_queue)
async for msg in reader.consume("tasks"):
try:
await self._handle(msg["payload"])
await reader.ack("tasks", [msg["id"]])
except Exception:
await reader.nack("tasks", [msg["id"]], requeue=True)
async def _handle(self, task: TaskPayload) -> None:
pass
Connection management
The client uses aio_pika.connect_robust for automatic reconnection. Key behaviors:
- Channel reuse: within a context scope, channels are reused via context variables to avoid overhead
- Nested scopes: nested
channel()calls reuse the parent channel - Pending message tracking: the client maintains a dedicated channel for consumer operations and tracks unacknowledged messages
The lifecycle step handles connection setup and teardown:
# Startup: connect_robust with heartbeat and timeout
# Shutdown: close pending channel, close connection, clear state
Queue declaration
Queues are declared automatically on first use with durable=True (configurable via RabbitMQConfig.queue_durable). The adapter uses the default exchange with the queue name as the routing key.
Dead letter exchanges
Configure DLX on queue declaration for messages that fail processing. This is an infrastructure concern configured in RabbitMQ management or via the adapter:
# In RabbitMQ management or via rabbitmqctl
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange": "dlx"}' --apply-to queues
Messages rejected with requeue=False are routed to the configured dead letter exchange.
Combining with other modules
RabbitMQ is typically combined with Postgres and Redis:
deps_plan = DepsPlan.from_modules(
lambda: Deps.merge(
PostgresDepsModule(client=pg, rev_bump_strategy="database", history_write_strategy="database")(),
RedisDepsModule(client=redis)(),
RabbitMQDepsModule(client=rabbitmq)(),
),
)
lifecycle = LifecyclePlan.from_steps(
postgres_lifecycle_step(dsn="postgresql://...", config=PostgresConfig()),
redis_lifecycle_step(dsn="redis://...", config=RedisConfig()),
rabbitmq_lifecycle_step(dsn="amqp://guest:guest@localhost:5672/", config=RabbitMQConfig()),
)
Event-driven pattern with after-commit effects
Use RabbitMQ as a reliable event bus by dispatching messages in after-commit effects:
from forze.application.composition.document import DocumentOperation, tx_document_plan
def order_created_effect(ctx):
async def effect(args, result):
writer = ctx.dep(QueueWriteDepKey)(ctx, order_events_queue)
await writer.enqueue(
"order-events",
OrderCreatedEvent(order_id=str(result.id)),
type="order.created",
)
return result
return effect
plan = (
tx_document_plan
.after_commit(DocumentOperation.CREATE, order_created_effect)
)
This ensures messages are only sent after the database transaction commits successfully, preventing phantom messages from rolled-back transactions.