from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Optional
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from regstack.auth.jwt import TokenError, is_payload_bulk_revoked
if TYPE_CHECKING:
from regstack.auth.jwt import JwtCodec
from regstack.backends.protocols import BlacklistRepoProtocol, UserRepoProtocol
from regstack.models.user import BaseUser
UserDependency = Callable[..., Awaitable["BaseUser"]]
# typing.Optional avoids a PEP-604 union-inside-string-forward-ref,
# which some mypy / pyright configurations refuse to resolve when
# BaseUser is only imported under TYPE_CHECKING.
OptionalUserDependency = Callable[..., Awaitable[Optional["BaseUser"]]]
_bearer = HTTPBearer(auto_error=False)
[docs]
class AuthDependencies:
"""Factory for FastAPI auth dependencies, bound to one RegStack instance.
A factory (rather than module-level dependencies) so two embedded
RegStack instances in the same process don't share state via module
globals — useful for multi-tenant deployments.
Hosts that want to require authentication on their own endpoints
use :meth:`current_user` and :meth:`current_admin` as FastAPI
``Depends(...)`` arguments::
from fastapi import Depends
@app.get("/me/orders")
async def list_orders(user = Depends(regstack.deps.current_user())):
...
"""
def __init__(
self,
*,
jwt: JwtCodec,
users: UserRepoProtocol,
blacklist: BlacklistRepoProtocol,
) -> None:
"""Bind the factory to a codec and the user/blacklist repos.
Args:
jwt: The :class:`~regstack.auth.jwt.JwtCodec` used to decode
bearer tokens.
users: The user repository — looked up to confirm the
``sub`` claim still resolves to an active user.
blacklist: The
:class:`~regstack.backends.protocols.BlacklistRepoProtocol`
used for per-token (``jti``) revocation checks.
"""
self._jwt = jwt
self._users = users
self._blacklist = blacklist
[docs]
def current_user(self) -> UserDependency:
"""Return a FastAPI dependency that yields the authenticated user.
The returned callable validates the ``Authorization: Bearer
<token>`` header against the JWT codec, the per-token
blacklist, and the user's bulk-revoke cutoff. On success the
:class:`~regstack.models.user.BaseUser` is stashed on
``request.state.regstack_user`` for downstream middleware.
Returns:
A callable suitable for ``Depends(...)``. Raises
:class:`fastapi.HTTPException` 401 on any auth failure.
"""
async def _dep(
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> BaseUser:
user = await self._authenticate(creds)
request.state.regstack_user = user
return user
return _dep
[docs]
def current_user_optional(self) -> OptionalUserDependency:
"""Return a FastAPI dependency that yields the user or ``None``.
For endpoints that render differently for authenticated vs
anonymous callers (think "show your cart icon if logged in").
Treats every form of auth failure — missing header, bad scheme,
expired token, revoked jti, deleted user, bulk-revoked session
— as anonymous and returns ``None`` rather than raising 401.
On success the user is stashed on
``request.state.regstack_user`` (same as :meth:`current_user`),
so downstream middleware sees the same shape for either path.
Returns:
A callable suitable for ``Depends(...)``. Never raises an
``HTTPException`` — auth problems collapse to ``None``.
"""
async def _dep(
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> BaseUser | None:
try:
user = await self._authenticate(creds)
except HTTPException:
return None
request.state.regstack_user = user
return user
return _dep
[docs]
def current_admin(self) -> UserDependency:
"""Return a FastAPI dependency that yields a *superuser*.
Same checks as :meth:`current_user`, plus a 403 if the
authenticated user does not have ``is_superuser=True``.
Returns:
A callable suitable for ``Depends(...)``. Raises 401 on
auth failure or 403 on insufficient privilege.
"""
async def _dep(
request: Request,
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> BaseUser:
user = await self._authenticate(creds)
if not user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Administrator privileges required.",
)
request.state.regstack_user = user
return user
return _dep
async def _authenticate(self, creds: HTTPAuthorizationCredentials | None) -> BaseUser:
if creds is None or creds.scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = self._jwt.decode(creds.credentials)
except TokenError as exc:
# Never echo the pyjwt exception text — it discloses *why* the
# token was rejected ("signature verification failed",
# "expired", "malformed", "audience mismatch"), which is useful
# to an attacker probing the auth surface. One static 401 for
# every decode failure.
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token.",
headers={"WWW-Authenticate": "Bearer"},
) from exc
if await self._blacklist.is_revoked(payload.jti):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked.",
)
user = await self._users.get_by_id(payload.sub)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User no longer active.",
)
if is_payload_bulk_revoked(payload, user.tokens_invalidated_after):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session was invalidated; please sign in again.",
)
return user