Coverage for node / src / stigmem_node / db.py: 91%
71 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Database setup, migrations, and connection management.
3The ``db()`` context manager and ``apply_migrations()`` now delegate to the
4configured ``StorageBackend``. SQLite remains the default; set
5``STIGMEM_STORAGE_BACKEND=libsql`` (plus ``STIGMEM_LIBSQL_URL`` /
6``STIGMEM_LIBSQL_AUTH_TOKEN``) to switch to libSQL / Turso.
8Test fixtures patch ``stigmem_node.db.settings`` to override the backend
9and database path without touching the environment.
10"""
12from __future__ import annotations
14import logging
15import os
16import uuid
17from collections.abc import Generator
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any
22from .plugin_migrations import apply_registered_plugin_migrations
23from .plugins import Migration, get_registry
24from .settings import settings as settings
25from .storage import make_backend
27# Resolved once at import time; exposed for test fixtures that need the path.
28_MIGRATIONS_DIR = Path(__file__).parent.parent.parent / "migrations"
29logger = logging.getLogger("stigmem.db")
32@contextmanager
33def db() -> Generator[Any, None, None]:
34 """Yield a transaction-scoped, SQLite-API-compatible connection.
36 Passes the current module-level ``settings`` to ``make_backend`` so that
37 test fixtures can redirect the backend by patching ``db_mod.settings``.
38 """
39 with make_backend(_settings=settings).connection() as conn:
40 yield conn
43def apply_migrations(db_path: str | None = None) -> None:
44 """Apply any un-applied numbered SQL migrations from migrations/.
46 When *db_path* is given, always uses SQLite at that path (backward-compat
47 for CLI tools and test fixtures). When omitted, honours ``settings``.
48 """
49 registry = get_registry()
50 backend = make_backend(db_path=db_path, _settings=settings)
51 backend.apply_migrations(_MIGRATIONS_DIR)
52 _enforce_sqlite_owner_only_permissions(_sqlite_db_path(db_path))
53 migrations: list[Migration] = registry.fire_filter_chain(
54 "migration_register",
55 [],
56 db_path=db_path,
57 migrations_dir=_MIGRATIONS_DIR,
58 settings=settings,
59 )
60 apply_registered_plugin_migrations(
61 backend,
62 migrations,
63 plugin_order=registry.plugin_registration_order(),
64 plugin_versions=registry.plugin_versions(),
65 )
66 _enforce_sqlite_owner_only_permissions(_sqlite_db_path(db_path))
69def _sqlite_db_path(db_path: str | None) -> Path | None:
70 if db_path is not None:
71 selected = db_path
72 elif getattr(settings, "storage_backend", "sqlite") == "sqlite": 72 ↛ 75line 72 didn't jump to line 75 because the condition on line 72 was always true
73 selected = settings.db_path
74 else:
75 return None
76 if selected in {"", ":memory:"}: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 return None
78 return Path(selected)
81def _enforce_sqlite_owner_only_permissions(path: Path | None) -> None:
82 """Restrict local SQLite database artifacts to the owner when present."""
83 if path is None or os.name == "nt": 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 return
85 for artifact in (path, Path(f"{path}-wal"), Path(f"{path}-shm")):
86 if not artifact.exists():
87 continue
88 try:
89 artifact.chmod(0o600)
90 except OSError as exc:
91 logger.warning("failed to restrict SQLite artifact permissions: %s", exc)
94def collect_registered_plugin_migrations(db_path: str | None = None) -> list[Migration]:
95 """Collect plugin-declared migrations without applying them.
97 This helper is intentionally read-only. ``apply_migrations()`` owns the
98 checksum/downgrade checks and application lifecycle.
99 """
100 return get_registry().fire_filter_chain(
101 "migration_register",
102 [],
103 db_path=db_path,
104 migrations_dir=_MIGRATIONS_DIR,
105 settings=settings,
106 )
109def get_or_create_federation_keypair(db_path: str | None = None) -> tuple[str, str]:
110 """Return (pubkey_b64url, privkey_b64url), generating and persisting if needed."""
111 import base64
113 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
114 from cryptography.hazmat.primitives.serialization import (
115 Encoding,
116 NoEncryption,
117 PrivateFormat,
118 PublicFormat,
119 )
121 with make_backend(db_path=db_path, _settings=settings).connection() as conn:
122 pub_row = conn.execute(
123 "SELECT value FROM node_meta WHERE key='federation_pubkey'"
124 ).fetchone()
125 priv_row = conn.execute(
126 "SELECT value FROM node_meta WHERE key='federation_privkey'"
127 ).fetchone()
128 if pub_row and priv_row:
129 return str(pub_row["value"]), str(priv_row["value"])
131 privkey = Ed25519PrivateKey.generate()
132 pubkey = privkey.public_key()
133 priv_b64 = (
134 base64.urlsafe_b64encode(
135 privkey.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
136 )
137 .decode()
138 .rstrip("=")
139 )
140 pub_b64 = (
141 base64.urlsafe_b64encode(pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw))
142 .decode()
143 .rstrip("=")
144 )
145 conn.execute(
146 "INSERT OR REPLACE INTO node_meta (key, value) VALUES ('federation_pubkey', ?)",
147 (pub_b64,),
148 )
149 conn.execute(
150 "INSERT OR REPLACE INTO node_meta (key, value) VALUES ('federation_privkey', ?)",
151 (priv_b64,),
152 )
154 return pub_b64, priv_b64
157def get_or_create_node_id(db_path: str | None = None) -> str:
158 """Return the stable node UUID, creating it on first run."""
159 with make_backend(db_path=db_path, _settings=settings).connection() as conn:
160 row = conn.execute("SELECT value FROM node_meta WHERE key='node_id'").fetchone()
161 if row:
162 return str(row["value"])
163 node_id = settings.node_id or f"stigmem:node:{uuid.uuid4()}"
164 conn.execute("INSERT INTO node_meta (key, value) VALUES ('node_id', ?)", (node_id,))
166 return node_id