Coverage for node / src / stigmem_node / snapshot.py: 85%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Signed-snapshot backup/restore — Phase 8 (ACM-185). 

2 

3Snapshot create 

4 Captures: full fact database, schema migration cursor. 

5 Output: content-addressed .tar.gz with a manifest.json whose body is 

6 signed with the node's Ed25519 federation key (spec §6, same keypair as 

7 peer tokens / peer declarations). 

8 

9Snapshot restore 

10 Verifies the Ed25519 signature and SHA-256 artifact hashes before 

11 writing anything to disk. Refuses tampered input unless the caller 

12 explicitly passes force_unverified=True (always logged loudly). 

13 

14Tarball layout:: 

15 

16 artifacts/ 

17 stigmem.db # online-backup copy of the database 

18 schema_migration_cursor.json # sorted list of applied migrations 

19 manifest.json # hashes + Ed25519 signature 

20 

21manifest.json schema:: 

22 

23 { 

24 "version": 1, 

25 "created_at": "<ISO-8601 UTC>", 

26 "node_id": "stigmem:node:<uuid>", 

27 "signer_pubkey": "<base64url raw Ed25519 public key>", 

28 "artifacts": { 

29 "artifacts/stigmem.db": "sha256:<hex>", 

30 "artifacts/schema_migration_cursor.json": "sha256:<hex>" 

31 }, 

32 "signature": "<base64url Ed25519 signature over manifest body>" 

33 } 

34 

35The *manifest body* (the bytes that are signed) is the canonical JSON of 

36the manifest minus the ``"signature"`` field: lexicographic key order, no 

37extra whitespace, UTF-8 encoded. 

38 

39Secondary signing key (--sign-with KEY) 

40 A raw 32-byte Ed25519 private key stored as base64url in a text file. 

41 When supplied, the snapshot is signed with this key instead of the 

42 node's built-in federation key. 

43 

44Trusted-keys file (--trusted-keys PATH) 

45 A JSON file whose top-level value is a list of base64url-encoded raw 

46 Ed25519 public keys, e.g. ``["<key1>", "<key2>"]``. Restore checks 

47 the manifest signature against every key in the list. When omitted, 

48 only the local node's own public key is trusted. 

49""" 

50 

51from __future__ import annotations 

52 

53import base64 

54import hashlib 

55import json 

56import logging 

57import shutil 

58import sqlite3 

59import tarfile 

60import tempfile 

61from datetime import UTC, datetime 

62from pathlib import Path 

63from typing import Any 

64 

65from cryptography.exceptions import InvalidSignature 

66from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 

67 Ed25519PrivateKey, 

68 Ed25519PublicKey, 

69) 

70 

71logger = logging.getLogger(__name__) 

72 

73_MANIFEST_VERSION = 1 

74_DB_ARTIFACT = "artifacts/stigmem.db" 

75_CURSOR_ARTIFACT = "artifacts/schema_migration_cursor.json" 

76 

77 

78# --------------------------------------------------------------------------- 

79# Helpers 

80# --------------------------------------------------------------------------- 

81 

82 

83def _b64url_encode(data: bytes) -> str: 

84 return base64.urlsafe_b64encode(data).rstrip(b"=").decode() 

85 

86 

87def _b64url_decode(s: str) -> bytes: 

88 pad = (4 - len(s) % 4) % 4 

89 return base64.urlsafe_b64decode(s + "=" * pad) 

90 

91 

92def _sha256_file(path: Path) -> str: 

93 h = hashlib.sha256() 

94 with open(path, "rb") as fh: 

95 for chunk in iter(lambda: fh.read(65536), b""): 

96 h.update(chunk) 

97 return f"sha256:{h.hexdigest()}" 

98 

99 

100def _canonical_manifest_body(manifest: dict[str, Any]) -> bytes: 

101 body = {k: v for k, v in manifest.items() if k != "signature"} 

102 return json.dumps(body, sort_keys=True, separators=(",", ":")).encode("utf-8") 

103 

104 

105def _load_secondary_key(key_path: Path) -> Ed25519PrivateKey: 

106 raw_b64 = key_path.read_text().strip() 

107 raw_bytes = _b64url_decode(raw_b64) 

108 if len(raw_bytes) != 32: # noqa: PLR2004 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 raise ValueError( 

110 f"secondary signing key must be 32 raw bytes (base64url); got {len(raw_bytes)}" 

111 ) 

112 return Ed25519PrivateKey.from_private_bytes(raw_bytes) 

113 

114 

115def _trusted_pubkeys( 

116 trusted_keys_path: Path | None, 

117 db_path: str | None, 

118 self_attesting_pubkey: str | None = None, 

119) -> list[Ed25519PublicKey]: 

120 """Build the trusted public key set for restore verification. 

121 

122 When *trusted_keys_path* is given, only those keys are trusted (explicit 

123 operator list). When omitted the implicit set is used: the local node's 

124 own key from ``node_meta`` plus the key declared in the manifest itself 

125 (self-attesting mode — convenient for same-node restores without a trust 

126 file). 

127 """ 

128 keys: list[Ed25519PublicKey] = [] 

129 

130 if trusted_keys_path is not None: 

131 try: 

132 raw_list = json.loads(trusted_keys_path.read_text()) 

133 except (OSError, json.JSONDecodeError) as exc: 

134 raise ValueError(f"cannot read trusted-keys file: {exc}") from exc 

135 if not isinstance(raw_list, list): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 raise ValueError("trusted-keys file must contain a JSON array of base64url keys") 

137 for entry in raw_list: 

138 pub_bytes = _b64url_decode(entry) 

139 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes)) 

140 # Explicit trust list only — do NOT add self-declared or local keys. 

141 return keys 

142 

143 # Implicit mode: local node key + self-declared key (same-node convenience). 

144 if db_path is not None: 144 ↛ 157line 144 didn't jump to line 157 because the condition on line 144 was always true

145 try: 

146 conn = sqlite3.connect(db_path) 

147 row = conn.execute( 

148 "SELECT value FROM node_meta WHERE key='federation_pubkey'" 

149 ).fetchone() 

150 conn.close() 

151 if row: 

152 pub_bytes = _b64url_decode(row[0]) 

153 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes)) 

154 except Exception as exc: # noqa: BLE001 

155 logger.debug("local federation pubkey unavailable for snapshot verify: %s", exc) 

156 

157 if self_attesting_pubkey: 157 ↛ 164line 157 didn't jump to line 164 because the condition on line 157 was always true

158 try: 

159 pub_bytes = _b64url_decode(self_attesting_pubkey) 

160 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes)) 

161 except Exception as exc: # noqa: BLE001 

162 logger.debug("invalid self-attesting pubkey skipped: %s", exc) 

163 

164 return keys 

165 

166 

167def _collect_schema_cursor(db_path: str) -> list[str]: 

168 try: 

169 conn = sqlite3.connect(db_path) 

170 rows = conn.execute("SELECT version FROM schema_migrations ORDER BY version ASC").fetchall() 

171 conn.close() 

172 return [r[0] for r in rows] 

173 except Exception: 

174 return [] 

175 

176 

177# --------------------------------------------------------------------------- 

178# Public API 

179# --------------------------------------------------------------------------- 

180 

181 

182def snapshot_create( 

183 db_path: str, 

184 out_path: Path | None = None, 

185 sign_with_key_path: Path | None = None, 

186) -> Path: 

187 """Create a signed snapshot tarball and return its path. 

188 

189 Args: 

190 db_path: Path to the SQLite database file. 

191 out_path: Explicit output path for the .tar.gz. When None a 

192 timestamped, content-addressed name is used in the CWD. 

193 sign_with_key_path: Path to a file containing a raw base64url 

194 Ed25519 private key (32 bytes). When None the node's own 

195 federation key from node_meta is used. 

196 

197 Returns: 

198 Path to the created tarball. 

199 """ 

200 created_at = datetime.now(UTC).isoformat() 

201 

202 with tempfile.TemporaryDirectory(prefix="stigmem-snap-") as tmpdir: 

203 tmp = Path(tmpdir) 

204 artifacts_dir = tmp / "artifacts" 

205 artifacts_dir.mkdir() 

206 

207 # -- 1. Online backup of the database -------------------------------- 

208 db_snap = artifacts_dir / "stigmem.db" 

209 src_conn = sqlite3.connect(db_path) 

210 dst_conn = sqlite3.connect(str(db_snap)) 

211 try: 

212 src_conn.backup(dst_conn) 

213 finally: 

214 dst_conn.close() 

215 src_conn.close() 

216 

217 # -- 2. Schema migration cursor -------------------------------------- 

218 cursor_path = artifacts_dir / "schema_migration_cursor.json" 

219 applied_migrations = _collect_schema_cursor(db_path) 

220 cursor_path.write_text(json.dumps({"applied_migrations": applied_migrations}, indent=2)) 

221 

222 # -- 3. Hashes ------------------------------------------------------- 

223 artifacts = { 

224 _DB_ARTIFACT: _sha256_file(db_snap), 

225 _CURSOR_ARTIFACT: _sha256_file(cursor_path), 

226 } 

227 

228 # -- 4. Signing key -------------------------------------------------- 

229 if sign_with_key_path is not None: 

230 priv_key = _load_secondary_key(sign_with_key_path) 

231 signer_pubkey = _b64url_encode(priv_key.public_key().public_bytes_raw()) 

232 else: 

233 from .db import get_or_create_federation_keypair, get_or_create_node_id 

234 

235 pub_b64, priv_b64 = get_or_create_federation_keypair(db_path=db_path) 

236 priv_raw = _b64url_decode(priv_b64) 

237 priv_key = Ed25519PrivateKey.from_private_bytes(priv_raw) 

238 signer_pubkey = pub_b64 

239 

240 # -- 5. Node identity ----------------------------------------------- 

241 try: 

242 from .db import get_or_create_node_id 

243 

244 node_id = get_or_create_node_id(db_path=db_path) 

245 except Exception as exc: 

246 logger.warning("could not resolve node id for snapshot manifest: %s", exc) 

247 node_id = "" 

248 

249 # -- 6. Build manifest and sign ------------------------------------- 

250 manifest: dict[str, Any] = { 

251 "version": _MANIFEST_VERSION, 

252 "created_at": created_at, 

253 "node_id": node_id, 

254 "signer_pubkey": signer_pubkey, 

255 "artifacts": artifacts, 

256 } 

257 body = _canonical_manifest_body(manifest) 

258 sig_bytes = priv_key.sign(body) 

259 manifest["signature"] = _b64url_encode(sig_bytes) 

260 

261 manifest_path = tmp / "manifest.json" 

262 manifest_path.write_text(json.dumps(manifest, indent=2)) 

263 

264 # -- 7. Pack tarball ------------------------------------------------- 

265 # Determine final output path; use content-addressed name when not given. 

266 ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") 

267 db_hash_short = artifacts[_DB_ARTIFACT].split(":")[1][:12] 

268 

269 if out_path is None: 

270 out_path = Path(f"stigmem-snapshot-{ts}-{db_hash_short}.tar.gz") 

271 

272 with tarfile.open(out_path, "w:gz") as tf: 

273 tf.add(manifest_path, arcname="manifest.json") 

274 tf.add(artifacts_dir, arcname="artifacts") 

275 

276 logger.info("snapshot created: %s (migrations: %s)", out_path, len(applied_migrations)) 

277 return out_path 

278 

279 

280class SnapshotVerificationError(Exception): 

281 pass 

282 

283 

284def _load_manifest(manifest_path: Path) -> dict[str, Any]: 

285 """Load and validate the manifest.json from an extracted snapshot.""" 

286 if not manifest_path.exists(): 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 raise SnapshotVerificationError("snapshot missing manifest.json") 

288 

289 try: 

290 manifest: dict[str, Any] = json.loads(manifest_path.read_text()) 

291 except json.JSONDecodeError as exc: 

292 raise SnapshotVerificationError(f"manifest.json is not valid JSON: {exc}") from exc 

293 

294 if manifest.get("version") != _MANIFEST_VERSION: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 raise SnapshotVerificationError(f"unsupported manifest version {manifest.get('version')!r}") 

296 return manifest 

297 

298 

299def _verify_artifact_hashes(manifest: dict[str, Any], tmp: Path) -> None: 

300 """Verify SHA-256 of every declared artifact against the manifest.""" 

301 declared: dict[str, str] = manifest.get("artifacts", {}) 

302 for arc_name, expected_hash in declared.items(): 

303 artifact_path = tmp / arc_name 

304 if not artifact_path.exists(): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 raise SnapshotVerificationError(f"artifact {arc_name!r} missing from tarball") 

306 actual_hash = _sha256_file(artifact_path) 

307 if actual_hash != expected_hash: 

308 raise SnapshotVerificationError( 

309 f"hash mismatch for {arc_name!r}: expected {expected_hash!r}, got {actual_hash!r}" 

310 ) 

311 

312 

313def _verify_manifest_signature( 

314 manifest: dict[str, Any], 

315 db_path: str, 

316 trusted_keys_path: Path | None, 

317) -> None: 

318 """Verify the Ed25519 signature on the manifest body against trusted keys.""" 

319 signer_pubkey_b64: str = manifest.get("signer_pubkey", "") 

320 signature_b64: str = manifest.get("signature", "") 

321 if not signer_pubkey_b64 or not signature_b64: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true

322 raise SnapshotVerificationError("manifest missing signer_pubkey or signature") 

323 

324 body = _canonical_manifest_body(manifest) 

325 

326 trusted = _trusted_pubkeys( 

327 trusted_keys_path, 

328 db_path, 

329 self_attesting_pubkey=signer_pubkey_b64 if trusted_keys_path is None else None, 

330 ) 

331 

332 sig_bytes = _b64url_decode(signature_b64) 

333 verified = False 

334 for pub_key in trusted: 

335 try: 

336 pub_key.verify(sig_bytes, body) 

337 verified = True 

338 break 

339 except InvalidSignature as exc: 

340 logger.debug("snapshot manifest signature did not match a trusted key: %s", exc) 

341 continue 

342 

343 if not verified: 

344 raise SnapshotVerificationError( 

345 "manifest signature is invalid — snapshot may have been tampered with. " 

346 "Pass --force-unverified to restore anyway (NOT recommended)." 

347 ) 

348 

349 

350def snapshot_restore( 

351 tarball_path: Path, 

352 db_path: str, 

353 trusted_keys_path: Path | None = None, 

354 force_unverified: bool = False, 

355) -> None: 

356 """Restore a snapshot, verifying signature and artifact hashes. 

357 

358 Args: 

359 tarball_path: Path to a .tar.gz produced by snapshot_create. 

360 db_path: Destination database path. **Existing data is overwritten.** 

361 trusted_keys_path: JSON file listing trusted base64url public keys. 

362 When None only the local node's own key is trusted. 

363 force_unverified: If True, skip signature/hash verification and 

364 restore anyway. A loud warning is logged regardless. 

365 

366 Raises: 

367 SnapshotVerificationError: On tampered input (unless force_unverified). 

368 """ 

369 if force_unverified: 

370 logger.warning( 

371 "SECURITY WARNING: restoring unverified snapshot from %s — " 

372 "--force-unverified was passed; integrity checks are DISABLED", 

373 tarball_path, 

374 ) 

375 

376 with tempfile.TemporaryDirectory(prefix="stigmem-restore-") as tmpdir: 

377 tmp = Path(tmpdir) 

378 

379 # -- 1. Extract ------------------------------------------------------ 

380 # filter='data' (PEP 706, Python 3.11.4+) rejects absolute paths, 

381 # `..` traversal, symlinks pointing outside the destination, device 

382 # files, setuid/setgid bits, etc. — closes the path-traversal hole 

383 # that bare extractall() leaves open even with a controlled tmpdir. 

384 with tarfile.open(tarball_path, "r:gz") as tf: 

385 tf.extractall(tmp, filter="data") 

386 

387 manifest = _load_manifest(tmp / "manifest.json") 

388 

389 if not force_unverified: 

390 # -- 2. Verify artifact hashes ----------------------------------- 

391 _verify_artifact_hashes(manifest, tmp) 

392 

393 # -- 3. Verify Ed25519 signature --------------------------------- 

394 _verify_manifest_signature(manifest, db_path, trusted_keys_path) 

395 

396 logger.info("snapshot verified: signature OK, all artifact hashes match") 

397 else: 

398 logger.warning( 

399 "SECURITY WARNING: artifact hash and signature verification SKIPPED for %s", 

400 tarball_path, 

401 ) 

402 

403 # -- 4. Restore database -------------------------------------------- 

404 db_artifact = tmp / _DB_ARTIFACT 

405 if not db_artifact.exists(): 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 raise SnapshotVerificationError( 

407 f"database artifact {_DB_ARTIFACT!r} not found in snapshot" 

408 ) 

409 shutil.copy2(str(db_artifact), db_path) 

410 logger.info("snapshot restored to %s", db_path)