Coverage for node / src / stigmem_node / db.py: 91%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Database setup, migrations, and connection management. 

2 

3The ``db()`` context manager and ``apply_migrations()`` now delegate to the 

4configured ``StorageBackend``. SQLite remains the default; set 

5``STIGMEM_STORAGE_BACKEND=libsql`` (plus ``STIGMEM_LIBSQL_URL`` / 

6``STIGMEM_LIBSQL_AUTH_TOKEN``) to switch to libSQL / Turso. 

7 

8Test fixtures patch ``stigmem_node.db.settings`` to override the backend 

9and database path without touching the environment. 

10""" 

11 

12from __future__ import annotations 

13 

14import logging 

15import os 

16import uuid 

17from collections.abc import Generator 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any 

21 

22from .plugin_migrations import apply_registered_plugin_migrations 

23from .plugins import Migration, get_registry 

24from .settings import settings as settings 

25from .storage import make_backend 

26 

27# Resolved once at import time; exposed for test fixtures that need the path. 

28_MIGRATIONS_DIR = Path(__file__).parent.parent.parent / "migrations" 

29logger = logging.getLogger("stigmem.db") 

30 

31 

32@contextmanager 

33def db() -> Generator[Any, None, None]: 

34 """Yield a transaction-scoped, SQLite-API-compatible connection. 

35 

36 Passes the current module-level ``settings`` to ``make_backend`` so that 

37 test fixtures can redirect the backend by patching ``db_mod.settings``. 

38 """ 

39 with make_backend(_settings=settings).connection() as conn: 

40 yield conn 

41 

42 

43def apply_migrations(db_path: str | None = None) -> None: 

44 """Apply any un-applied numbered SQL migrations from migrations/. 

45 

46 When *db_path* is given, always uses SQLite at that path (backward-compat 

47 for CLI tools and test fixtures). When omitted, honours ``settings``. 

48 """ 

49 registry = get_registry() 

50 backend = make_backend(db_path=db_path, _settings=settings) 

51 backend.apply_migrations(_MIGRATIONS_DIR) 

52 _enforce_sqlite_owner_only_permissions(_sqlite_db_path(db_path)) 

53 migrations: list[Migration] = registry.fire_filter_chain( 

54 "migration_register", 

55 [], 

56 db_path=db_path, 

57 migrations_dir=_MIGRATIONS_DIR, 

58 settings=settings, 

59 ) 

60 apply_registered_plugin_migrations( 

61 backend, 

62 migrations, 

63 plugin_order=registry.plugin_registration_order(), 

64 plugin_versions=registry.plugin_versions(), 

65 ) 

66 _enforce_sqlite_owner_only_permissions(_sqlite_db_path(db_path)) 

67 

68 

69def _sqlite_db_path(db_path: str | None) -> Path | None: 

70 if db_path is not None: 

71 selected = db_path 

72 elif getattr(settings, "storage_backend", "sqlite") == "sqlite": 72 ↛ 75line 72 didn't jump to line 75 because the condition on line 72 was always true

73 selected = settings.db_path 

74 else: 

75 return None 

76 if selected in {"", ":memory:"}: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 return None 

78 return Path(selected) 

79 

80 

81def _enforce_sqlite_owner_only_permissions(path: Path | None) -> None: 

82 """Restrict local SQLite database artifacts to the owner when present.""" 

83 if path is None or os.name == "nt": 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 return 

85 for artifact in (path, Path(f"{path}-wal"), Path(f"{path}-shm")): 

86 if not artifact.exists(): 

87 continue 

88 try: 

89 artifact.chmod(0o600) 

90 except OSError as exc: 

91 logger.warning("failed to restrict SQLite artifact permissions: %s", exc) 

92 

93 

94def collect_registered_plugin_migrations(db_path: str | None = None) -> list[Migration]: 

95 """Collect plugin-declared migrations without applying them. 

96 

97 This helper is intentionally read-only. ``apply_migrations()`` owns the 

98 checksum/downgrade checks and application lifecycle. 

99 """ 

100 return get_registry().fire_filter_chain( 

101 "migration_register", 

102 [], 

103 db_path=db_path, 

104 migrations_dir=_MIGRATIONS_DIR, 

105 settings=settings, 

106 ) 

107 

108 

109def get_or_create_federation_keypair(db_path: str | None = None) -> tuple[str, str]: 

110 """Return (pubkey_b64url, privkey_b64url), generating and persisting if needed.""" 

111 import base64 

112 

113 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 

114 from cryptography.hazmat.primitives.serialization import ( 

115 Encoding, 

116 NoEncryption, 

117 PrivateFormat, 

118 PublicFormat, 

119 ) 

120 

121 with make_backend(db_path=db_path, _settings=settings).connection() as conn: 

122 pub_row = conn.execute( 

123 "SELECT value FROM node_meta WHERE key='federation_pubkey'" 

124 ).fetchone() 

125 priv_row = conn.execute( 

126 "SELECT value FROM node_meta WHERE key='federation_privkey'" 

127 ).fetchone() 

128 if pub_row and priv_row: 

129 return str(pub_row["value"]), str(priv_row["value"]) 

130 

131 privkey = Ed25519PrivateKey.generate() 

132 pubkey = privkey.public_key() 

133 priv_b64 = ( 

134 base64.urlsafe_b64encode( 

135 privkey.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) 

136 ) 

137 .decode() 

138 .rstrip("=") 

139 ) 

140 pub_b64 = ( 

141 base64.urlsafe_b64encode(pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)) 

142 .decode() 

143 .rstrip("=") 

144 ) 

145 conn.execute( 

146 "INSERT OR REPLACE INTO node_meta (key, value) VALUES ('federation_pubkey', ?)", 

147 (pub_b64,), 

148 ) 

149 conn.execute( 

150 "INSERT OR REPLACE INTO node_meta (key, value) VALUES ('federation_privkey', ?)", 

151 (priv_b64,), 

152 ) 

153 

154 return pub_b64, priv_b64 

155 

156 

157def get_or_create_node_id(db_path: str | None = None) -> str: 

158 """Return the stable node UUID, creating it on first run.""" 

159 with make_backend(db_path=db_path, _settings=settings).connection() as conn: 

160 row = conn.execute("SELECT value FROM node_meta WHERE key='node_id'").fetchone() 

161 if row: 

162 return str(row["value"]) 

163 node_id = settings.node_id or f"stigmem:node:{uuid.uuid4()}" 

164 conn.execute("INSERT INTO node_meta (key, value) VALUES ('node_id', ?)", (node_id,)) 

165 

166 return node_id