Skip to content

Transactional notifications

A "welcome email" that fires on signup shouldn't send if the signup rolled back, and shouldn't be lost if it committed. That's the outbox guarantee again — plus a small kit that routes each relayed event to the right notification and hands it to a sender.

The runnable version lives at examples/recipes/notifications/ (mock — no broker or SMTP needed).

Stage the notification

The producer stages an event exactly like the outbox recipe — the notification is just the integration event's purpose:

class UserRegistered(BaseModel):
    email: str


NOTIFY_EVENTS = OutboxSpec(
    name="notify-events",
    codec=PydanticModelCodec(UserRegistered),
    destination=OutboxDestination.queue(route="notifications", channel="notifications"),
)
NOTIFICATIONS = QueueSpec(
    name="notifications", codec=PydanticModelCodec(UserRegistered)
)

Route events to notifications

A NotificationRouter maps each event type to the notifications it should produce. The mapper receives the integration event, so it reads the payload:

# Map each integration event type to the notifications it should produce.
router = NotificationRouter()
router.register(
    "user.registered",
    lambda event: [
        EmailNotification(
            to=event.payload.email, subject="Welcome", body="Thanks for joining!"
        )
    ],
)

EmailNotification, PushNotification, and WebhookNotification are the shipped shapes.

Provide senders

NotificationSenders is a protocol — any object with send_email / send_push / send_webhook satisfies it. Real senders wrap SMTP, FCM, or an HTTP client; here it just records:

class RecordingSenders:
    """A NotificationSenders implementation — here it just records what it sent."""

    def __init__(self) -> None:
        self.emails: list[EmailNotification] = []

    async def send_email(self, notification: EmailNotification) -> None:
        self.emails.append(notification)

    async def send_push(self, notification: object) -> None: ...
    async def send_webhook(self, notification: object) -> None: ...

Consume and dispatch

The consumer relays the staged events to the queue, then feeds each message to process_notification_message, which resolves it through the router and calls the matching sender:

async def deliver_notifications(
    ctx: ExecutionContext,
    senders: RecordingSenders,
) -> int:
    # Relay staged events to the queue, then route each queued message to a sender.
    await OutboxRelay(outbox_spec=NOTIFY_EVENTS).to_queue(ctx, NOTIFICATIONS)

    queue = ctx.deps.resolve_configurable(
        ctx,
        QueueQueryDepKey,
        NOTIFICATIONS,
        route=NOTIFICATIONS.name,
    )

    sent = 0

    for message in await queue.receive("notifications"):
        sent += await process_notification_message(
            message,
            router=router,
            senders=senders,  # pyright: ignore[reportArgumentType]
        )
        await queue.ack("notifications", [message.id])
    return sent

Notes

  • process_notification_message takes a QueueMessage (not a raw payload) — the event type and id come from the relayed message's type and key.
  • Unmapped event types are skipped by default (skip_unmapped=True).
  • The producer and consumer are decoupled: they can be different processes, and the consumer is just a queue worker.