Source code for regstack.hooks.events
from __future__ import annotations
import asyncio
import inspect
import logging
from collections import defaultdict
from collections.abc import Awaitable, Callable
from typing import Any
log = logging.getLogger("regstack.hooks")
Handler = Callable[..., Awaitable[None] | None]
# Known event names. Hosts may also subscribe to custom events at their own risk.
KNOWN_EVENTS = {
"user_registered",
"user_logged_in",
"user_logged_out",
"user_verified",
"verification_requested",
"password_reset_requested",
"password_reset_completed",
"password_changed",
"email_change_requested",
"email_changed",
"phone_setup_started",
"phone_setup_disabled",
"mfa_login_started",
"mfa_enabled",
"mfa_disabled",
"user_deleted",
"oauth_signin_started",
"oauth_signin_completed",
"oauth_account_linked",
"oauth_account_unlinked",
}
[docs]
class HookRegistry:
"""Tiny in-process event bus for auth-flow side-effects.
Hosts subscribe handlers (sync or async) to event names; regstack
fires events at the natural points in each flow (registration,
login, password change, etc.). Handlers run concurrently.
Exceptions raised by a handler are **logged and swallowed** so a
misbehaving notification handler cannot break the primary auth
flow. If you need a hard dependency on a side-effect succeeding,
do that work synchronously in a wrapper around the regstack call —
not in a hook.
See :data:`KNOWN_EVENTS` for the events regstack itself fires;
hosts may subscribe to custom event names too.
"""
def __init__(self) -> None:
"""Construct an empty registry."""
self._handlers: dict[str, list[Handler]] = defaultdict(list)
[docs]
def on(self, event: str, handler: Handler) -> None:
"""Subscribe a handler to an event.
Multiple handlers per event are allowed and are fired
concurrently. Handlers can be sync or async; an async handler
is awaited (in parallel with its siblings) inside :meth:`fire`.
Args:
event: The event name (e.g. ``"user_registered"``). Not
validated against :data:`KNOWN_EVENTS` — hosts can use
custom names.
handler: A callable invoked with the event's keyword
arguments. Returning a value is fine; it's ignored.
"""
self._handlers[event].append(handler)
[docs]
async def fire(self, event: str, /, **kwargs: Any) -> None:
"""Run every handler subscribed to ``event``.
Sync handlers run inline (in registration order). Async
handlers are awaited concurrently via ``asyncio.gather``.
Exceptions in either kind are logged and discarded — they
never propagate.
Args:
event: The event name to dispatch.
**kwargs: Keyword arguments forwarded to every handler.
regstack passes a contextually-relevant set per event
(e.g. ``user`` for ``"user_registered"``).
"""
handlers = self._handlers.get(event, ())
if not handlers:
return
coros = []
for handler in handlers:
try:
result = handler(**kwargs)
except Exception:
log.exception("regstack hook %r raised synchronously", event)
continue
if inspect.isawaitable(result):
coros.append(_swallow(event, result))
if coros:
await asyncio.gather(*coros)
async def _swallow(event: str, awaitable: Awaitable[None]) -> None:
try:
await awaitable
except Exception:
log.exception("regstack hook %r raised in awaitable", event)