Source code for regstack.backends.sql.backend
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy.ext.asyncio import create_async_engine
from regstack.backends.base import Backend, BackendKind
from regstack.backends.sql.migrations import upgrade_async
from regstack.backends.sql.repositories.blacklist_repo import SqlBlacklistRepo
from regstack.backends.sql.repositories.login_attempt_repo import SqlLoginAttemptRepo
from regstack.backends.sql.repositories.mfa_code_repo import SqlMfaCodeRepo
from regstack.backends.sql.repositories.oauth_identity_repo import (
SqlOAuthIdentityRepo,
)
from regstack.backends.sql.repositories.oauth_state_repo import SqlOAuthStateRepo
from regstack.backends.sql.repositories.pending_repo import SqlPendingRepo
from regstack.backends.sql.repositories.user_repo import SqlUserRepo
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncEngine
from regstack.auth.clock import Clock
from regstack.config.schema import RegStackConfig
[docs]
class SqlBackend(Backend):
"""SQLAlchemy 2 async backend. Same code path drives SQLite (via
aiosqlite) and Postgres (via asyncpg) — only ``database_url`` differs.
"""
def __init__(
self,
*,
config: RegStackConfig,
clock: Clock,
kind: BackendKind,
) -> None:
super().__init__(config=config, clock=clock)
if kind not in (BackendKind.SQLITE, BackendKind.POSTGRES):
raise ValueError(f"SqlBackend does not support kind={kind}")
self.kind = kind
self._engine: AsyncEngine = create_async_engine(
config.database_url.get_secret_value(),
future=True,
)
self.users = SqlUserRepo(self._engine, clock=clock)
self.pending = SqlPendingRepo(self._engine)
self.blacklist = SqlBlacklistRepo(self._engine)
self.attempts = SqlLoginAttemptRepo(self._engine)
self.mfa_codes = SqlMfaCodeRepo(self._engine, clock=clock)
self.oauth_identities = SqlOAuthIdentityRepo(self._engine)
self.oauth_states = SqlOAuthStateRepo(self._engine)
[docs]
async def install_schema(self) -> None:
"""Run the bundled Alembic migrations to head.
Idempotent — Alembic's `upgrade` is a no-op when the database
is already at the target revision. Safe to call from a FastAPI
``lifespan`` startup on every boot.
Hosts that need to drive migrations from CI / a deploy step
instead of in-process can call ``regstack migrate`` (CLI) or
``regstack.backends.sql.migrations.upgrade(database_url)``
(programmatic) and skip ``install_schema()``.
"""
await upgrade_async(self.config.database_url.get_secret_value())
[docs]
async def aclose(self) -> None:
await self._engine.dispose()
[docs]
async def ping(self) -> None:
from sqlalchemy import text
async with self._engine.connect() as conn:
await conn.execute(text("SELECT 1"))
@property
def engine(self) -> AsyncEngine:
return self._engine