Source code for regstack.auth.lockout

from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from regstack.auth.clock import Clock
    from regstack.backends.protocols import LoginAttemptRepoProtocol
    from regstack.config.schema import RegStackConfig


[docs] @dataclass(slots=True, frozen=True) class LockoutDecision: """Result of a :meth:`LockoutService.check` call.""" locked: bool """``True`` if the email is locked out and the login request should be rejected without verifying the password.""" retry_after_seconds: int """How long the client should wait before retrying. Surfaced to the client as the HTTP ``Retry-After`` header on a 429 response. ``0`` when ``locked=False``."""
[docs] class LockoutService: """Per-account login lockout — sliding window over recent failures. Counts failed logins per email in a window of ``config.login_lockout_window_seconds``. Once the count exceeds ``config.login_lockout_threshold``, every login attempt for that email is rejected with HTTP 429 (and a ``Retry-After`` header) for the rest of the window — even when the password is correct, so an attacker can't tell whether their guess was right. Successful logins call :meth:`clear` to wipe the recorded failures eagerly. Disabled (always returns ``locked=False`` and never writes failures) when ``config.rate_limit_disabled`` is set. Tests rely on this to avoid timing flakes. """ def __init__( self, *, attempts: LoginAttemptRepoProtocol, config: RegStackConfig, clock: Clock, ) -> None: """Bind the service to a repo, a config, and a clock. Args: attempts: The :class:`~regstack.backends.protocols.LoginAttemptRepoProtocol` that stores failure rows. config: Carries the threshold and window settings, plus the ``rate_limit_disabled`` short-circuit. clock: Used for the window calculation and for stamping new failures with ``when=now``. """ self._attempts = attempts self._config = config self._clock = clock @property def _window(self) -> timedelta: return timedelta(seconds=self._config.login_lockout_window_seconds)
[docs] async def check(self, email: str) -> LockoutDecision: """Decide whether ``email`` is currently locked out. Should be called *before* the password is verified — the whole point is that a locked-out attacker can't probe whether their guess was correct by observing different responses. Args: email: The email being attempted. Returns: A :class:`LockoutDecision`. When ``locked`` is ``True`` the caller should respond with HTTP 429 and the ``Retry-After`` header. """ if self._config.rate_limit_disabled: return LockoutDecision(locked=False, retry_after_seconds=0) count = await self._attempts.count_recent(email, window=self._window, now=self._clock.now()) if count >= self._config.login_lockout_threshold: return LockoutDecision( locked=True, retry_after_seconds=self._config.login_lockout_window_seconds, ) return LockoutDecision(locked=False, retry_after_seconds=0)
[docs] async def attempts_remaining(self, email: str) -> int | None: """Return how many failures are left before lockout fires. Used by the login route after a wrong-password 401 so we can surface the same "N attempts remaining" message MFA shows. Returns ``None`` when rate-limiting is disabled (tests) — the route should then suppress the line entirely. """ if self._config.rate_limit_disabled: return None count = await self._attempts.count_recent(email, window=self._window, now=self._clock.now()) return max(self._config.login_lockout_threshold - count, 0)
[docs] async def record_failure(self, email: str, *, ip: str | None = None) -> None: """Record one failed login. No-op when rate limiting is disabled. Args: email: The email that failed. ip: Optional source IP. Recorded for auditing; not used by the threshold calculation today. """ if self._config.rate_limit_disabled: return await self._attempts.record_failure(email, when=self._clock.now(), ip=ip)
[docs] async def clear(self, email: str) -> None: """Wipe accumulated failures for ``email``. Called on successful login (so the user's next mistype doesn't get them halfway to lockout) and on successful password reset (so the legitimate user isn't still gated out by the attacker's attempts). Args: email: The email whose failure rows should be deleted. """ await self._attempts.clear(email)