Transactional notifications
A "welcome email" that fires on signup shouldn't send if the signup rolled back, and shouldn't be lost if it committed. That's the outbox guarantee again — plus a small kit that routes each relayed event to the right notification and hands it to a sender.
The runnable version lives at examples/recipes/notifications/ (mock — no broker
or SMTP needed).
Stage the notification¶
The producer stages an event exactly like the outbox recipe — the notification is just the integration event's purpose:
class UserRegistered(BaseModel):
email: str
NOTIFY_EVENTS = OutboxSpec(
name="notify-events",
codec=PydanticModelCodec(UserRegistered),
destination=OutboxDestination.queue(route="notifications", channel="notifications"),
)
NOTIFICATIONS = QueueSpec(
name="notifications", codec=PydanticModelCodec(UserRegistered)
)
Route events to notifications¶
A NotificationRouter maps each event type to the notifications it should
produce. The mapper receives the integration event, so it reads the payload:
# Map each integration event type to the notifications it should produce.
router = NotificationRouter()
router.register(
"user.registered",
lambda event: [
EmailNotification(
to=event.payload.email, subject="Welcome", body="Thanks for joining!"
)
],
)
EmailNotification, PushNotification, and WebhookNotification are the shipped
shapes.
Provide senders¶
NotificationSenders is a protocol — any object with send_email / send_push
/ send_webhook satisfies it. Real senders wrap SMTP, FCM, or an HTTP client;
here it just records:
class RecordingSenders:
"""A NotificationSenders implementation — here it just records what it sent."""
def __init__(self) -> None:
self.emails: list[EmailNotification] = []
async def send_email(self, notification: EmailNotification) -> None:
self.emails.append(notification)
async def send_push(self, notification: object) -> None: ...
async def send_webhook(self, notification: object) -> None: ...
Consume and dispatch¶
The consumer relays the staged events to the queue, then feeds each message to
process_notification_message, which resolves it through the router and calls the
matching sender:
async def deliver_notifications(
ctx: ExecutionContext,
senders: RecordingSenders,
) -> int:
# Relay staged events to the queue, then route each queued message to a sender.
await OutboxRelay(outbox_spec=NOTIFY_EVENTS).to_queue(ctx, NOTIFICATIONS)
queue = ctx.deps.resolve_configurable(
ctx,
QueueQueryDepKey,
NOTIFICATIONS,
route=NOTIFICATIONS.name,
)
sent = 0
for message in await queue.receive("notifications"):
sent += await process_notification_message(
message,
router=router,
senders=senders, # pyright: ignore[reportArgumentType]
)
await queue.ack("notifications", [message.id])
return sent
Notes¶
process_notification_messagetakes aQueueMessage(not a raw payload) — the event type and id come from the relayed message'stypeandkey.- Unmapped event types are skipped by default (
skip_unmapped=True). - The producer and consumer are decoupled: they can be different processes, and the consumer is just a queue worker.