Coverage for node / src / stigmem_node / snapshot.py: 85%
180 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Signed-snapshot backup/restore — Phase 8 (ACM-185).
3Snapshot create
4 Captures: full fact database, schema migration cursor.
5 Output: content-addressed .tar.gz with a manifest.json whose body is
6 signed with the node's Ed25519 federation key (spec §6, same keypair as
7 peer tokens / peer declarations).
9Snapshot restore
10 Verifies the Ed25519 signature and SHA-256 artifact hashes before
11 writing anything to disk. Refuses tampered input unless the caller
12 explicitly passes force_unverified=True (always logged loudly).
14Tarball layout::
16 artifacts/
17 stigmem.db # online-backup copy of the database
18 schema_migration_cursor.json # sorted list of applied migrations
19 manifest.json # hashes + Ed25519 signature
21manifest.json schema::
23 {
24 "version": 1,
25 "created_at": "<ISO-8601 UTC>",
26 "node_id": "stigmem:node:<uuid>",
27 "signer_pubkey": "<base64url raw Ed25519 public key>",
28 "artifacts": {
29 "artifacts/stigmem.db": "sha256:<hex>",
30 "artifacts/schema_migration_cursor.json": "sha256:<hex>"
31 },
32 "signature": "<base64url Ed25519 signature over manifest body>"
33 }
35The *manifest body* (the bytes that are signed) is the canonical JSON of
36the manifest minus the ``"signature"`` field: lexicographic key order, no
37extra whitespace, UTF-8 encoded.
39Secondary signing key (--sign-with KEY)
40 A raw 32-byte Ed25519 private key stored as base64url in a text file.
41 When supplied, the snapshot is signed with this key instead of the
42 node's built-in federation key.
44Trusted-keys file (--trusted-keys PATH)
45 A JSON file whose top-level value is a list of base64url-encoded raw
46 Ed25519 public keys, e.g. ``["<key1>", "<key2>"]``. Restore checks
47 the manifest signature against every key in the list. When omitted,
48 only the local node's own public key is trusted.
49"""
51from __future__ import annotations
53import base64
54import hashlib
55import json
56import logging
57import shutil
58import sqlite3
59import tarfile
60import tempfile
61from datetime import UTC, datetime
62from pathlib import Path
63from typing import Any
65from cryptography.exceptions import InvalidSignature
66from cryptography.hazmat.primitives.asymmetric.ed25519 import (
67 Ed25519PrivateKey,
68 Ed25519PublicKey,
69)
71logger = logging.getLogger(__name__)
73_MANIFEST_VERSION = 1
74_DB_ARTIFACT = "artifacts/stigmem.db"
75_CURSOR_ARTIFACT = "artifacts/schema_migration_cursor.json"
78# ---------------------------------------------------------------------------
79# Helpers
80# ---------------------------------------------------------------------------
83def _b64url_encode(data: bytes) -> str:
84 return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
87def _b64url_decode(s: str) -> bytes:
88 pad = (4 - len(s) % 4) % 4
89 return base64.urlsafe_b64decode(s + "=" * pad)
92def _sha256_file(path: Path) -> str:
93 h = hashlib.sha256()
94 with open(path, "rb") as fh:
95 for chunk in iter(lambda: fh.read(65536), b""):
96 h.update(chunk)
97 return f"sha256:{h.hexdigest()}"
100def _canonical_manifest_body(manifest: dict[str, Any]) -> bytes:
101 body = {k: v for k, v in manifest.items() if k != "signature"}
102 return json.dumps(body, sort_keys=True, separators=(",", ":")).encode("utf-8")
105def _load_secondary_key(key_path: Path) -> Ed25519PrivateKey:
106 raw_b64 = key_path.read_text().strip()
107 raw_bytes = _b64url_decode(raw_b64)
108 if len(raw_bytes) != 32: # noqa: PLR2004 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 raise ValueError(
110 f"secondary signing key must be 32 raw bytes (base64url); got {len(raw_bytes)}"
111 )
112 return Ed25519PrivateKey.from_private_bytes(raw_bytes)
115def _trusted_pubkeys(
116 trusted_keys_path: Path | None,
117 db_path: str | None,
118 self_attesting_pubkey: str | None = None,
119) -> list[Ed25519PublicKey]:
120 """Build the trusted public key set for restore verification.
122 When *trusted_keys_path* is given, only those keys are trusted (explicit
123 operator list). When omitted the implicit set is used: the local node's
124 own key from ``node_meta`` plus the key declared in the manifest itself
125 (self-attesting mode — convenient for same-node restores without a trust
126 file).
127 """
128 keys: list[Ed25519PublicKey] = []
130 if trusted_keys_path is not None:
131 try:
132 raw_list = json.loads(trusted_keys_path.read_text())
133 except (OSError, json.JSONDecodeError) as exc:
134 raise ValueError(f"cannot read trusted-keys file: {exc}") from exc
135 if not isinstance(raw_list, list): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 raise ValueError("trusted-keys file must contain a JSON array of base64url keys")
137 for entry in raw_list:
138 pub_bytes = _b64url_decode(entry)
139 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes))
140 # Explicit trust list only — do NOT add self-declared or local keys.
141 return keys
143 # Implicit mode: local node key + self-declared key (same-node convenience).
144 if db_path is not None: 144 ↛ 157line 144 didn't jump to line 157 because the condition on line 144 was always true
145 try:
146 conn = sqlite3.connect(db_path)
147 row = conn.execute(
148 "SELECT value FROM node_meta WHERE key='federation_pubkey'"
149 ).fetchone()
150 conn.close()
151 if row:
152 pub_bytes = _b64url_decode(row[0])
153 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes))
154 except Exception as exc: # noqa: BLE001
155 logger.debug("local federation pubkey unavailable for snapshot verify: %s", exc)
157 if self_attesting_pubkey: 157 ↛ 164line 157 didn't jump to line 164 because the condition on line 157 was always true
158 try:
159 pub_bytes = _b64url_decode(self_attesting_pubkey)
160 keys.append(Ed25519PublicKey.from_public_bytes(pub_bytes))
161 except Exception as exc: # noqa: BLE001
162 logger.debug("invalid self-attesting pubkey skipped: %s", exc)
164 return keys
167def _collect_schema_cursor(db_path: str) -> list[str]:
168 try:
169 conn = sqlite3.connect(db_path)
170 rows = conn.execute("SELECT version FROM schema_migrations ORDER BY version ASC").fetchall()
171 conn.close()
172 return [r[0] for r in rows]
173 except Exception:
174 return []
177# ---------------------------------------------------------------------------
178# Public API
179# ---------------------------------------------------------------------------
182def snapshot_create(
183 db_path: str,
184 out_path: Path | None = None,
185 sign_with_key_path: Path | None = None,
186) -> Path:
187 """Create a signed snapshot tarball and return its path.
189 Args:
190 db_path: Path to the SQLite database file.
191 out_path: Explicit output path for the .tar.gz. When None a
192 timestamped, content-addressed name is used in the CWD.
193 sign_with_key_path: Path to a file containing a raw base64url
194 Ed25519 private key (32 bytes). When None the node's own
195 federation key from node_meta is used.
197 Returns:
198 Path to the created tarball.
199 """
200 created_at = datetime.now(UTC).isoformat()
202 with tempfile.TemporaryDirectory(prefix="stigmem-snap-") as tmpdir:
203 tmp = Path(tmpdir)
204 artifacts_dir = tmp / "artifacts"
205 artifacts_dir.mkdir()
207 # -- 1. Online backup of the database --------------------------------
208 db_snap = artifacts_dir / "stigmem.db"
209 src_conn = sqlite3.connect(db_path)
210 dst_conn = sqlite3.connect(str(db_snap))
211 try:
212 src_conn.backup(dst_conn)
213 finally:
214 dst_conn.close()
215 src_conn.close()
217 # -- 2. Schema migration cursor --------------------------------------
218 cursor_path = artifacts_dir / "schema_migration_cursor.json"
219 applied_migrations = _collect_schema_cursor(db_path)
220 cursor_path.write_text(json.dumps({"applied_migrations": applied_migrations}, indent=2))
222 # -- 3. Hashes -------------------------------------------------------
223 artifacts = {
224 _DB_ARTIFACT: _sha256_file(db_snap),
225 _CURSOR_ARTIFACT: _sha256_file(cursor_path),
226 }
228 # -- 4. Signing key --------------------------------------------------
229 if sign_with_key_path is not None:
230 priv_key = _load_secondary_key(sign_with_key_path)
231 signer_pubkey = _b64url_encode(priv_key.public_key().public_bytes_raw())
232 else:
233 from .db import get_or_create_federation_keypair, get_or_create_node_id
235 pub_b64, priv_b64 = get_or_create_federation_keypair(db_path=db_path)
236 priv_raw = _b64url_decode(priv_b64)
237 priv_key = Ed25519PrivateKey.from_private_bytes(priv_raw)
238 signer_pubkey = pub_b64
240 # -- 5. Node identity -----------------------------------------------
241 try:
242 from .db import get_or_create_node_id
244 node_id = get_or_create_node_id(db_path=db_path)
245 except Exception as exc:
246 logger.warning("could not resolve node id for snapshot manifest: %s", exc)
247 node_id = ""
249 # -- 6. Build manifest and sign -------------------------------------
250 manifest: dict[str, Any] = {
251 "version": _MANIFEST_VERSION,
252 "created_at": created_at,
253 "node_id": node_id,
254 "signer_pubkey": signer_pubkey,
255 "artifacts": artifacts,
256 }
257 body = _canonical_manifest_body(manifest)
258 sig_bytes = priv_key.sign(body)
259 manifest["signature"] = _b64url_encode(sig_bytes)
261 manifest_path = tmp / "manifest.json"
262 manifest_path.write_text(json.dumps(manifest, indent=2))
264 # -- 7. Pack tarball -------------------------------------------------
265 # Determine final output path; use content-addressed name when not given.
266 ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
267 db_hash_short = artifacts[_DB_ARTIFACT].split(":")[1][:12]
269 if out_path is None:
270 out_path = Path(f"stigmem-snapshot-{ts}-{db_hash_short}.tar.gz")
272 with tarfile.open(out_path, "w:gz") as tf:
273 tf.add(manifest_path, arcname="manifest.json")
274 tf.add(artifacts_dir, arcname="artifacts")
276 logger.info("snapshot created: %s (migrations: %s)", out_path, len(applied_migrations))
277 return out_path
280class SnapshotVerificationError(Exception):
281 pass
284def _load_manifest(manifest_path: Path) -> dict[str, Any]:
285 """Load and validate the manifest.json from an extracted snapshot."""
286 if not manifest_path.exists(): 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 raise SnapshotVerificationError("snapshot missing manifest.json")
289 try:
290 manifest: dict[str, Any] = json.loads(manifest_path.read_text())
291 except json.JSONDecodeError as exc:
292 raise SnapshotVerificationError(f"manifest.json is not valid JSON: {exc}") from exc
294 if manifest.get("version") != _MANIFEST_VERSION: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 raise SnapshotVerificationError(f"unsupported manifest version {manifest.get('version')!r}")
296 return manifest
299def _verify_artifact_hashes(manifest: dict[str, Any], tmp: Path) -> None:
300 """Verify SHA-256 of every declared artifact against the manifest."""
301 declared: dict[str, str] = manifest.get("artifacts", {})
302 for arc_name, expected_hash in declared.items():
303 artifact_path = tmp / arc_name
304 if not artifact_path.exists(): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 raise SnapshotVerificationError(f"artifact {arc_name!r} missing from tarball")
306 actual_hash = _sha256_file(artifact_path)
307 if actual_hash != expected_hash:
308 raise SnapshotVerificationError(
309 f"hash mismatch for {arc_name!r}: expected {expected_hash!r}, got {actual_hash!r}"
310 )
313def _verify_manifest_signature(
314 manifest: dict[str, Any],
315 db_path: str,
316 trusted_keys_path: Path | None,
317) -> None:
318 """Verify the Ed25519 signature on the manifest body against trusted keys."""
319 signer_pubkey_b64: str = manifest.get("signer_pubkey", "")
320 signature_b64: str = manifest.get("signature", "")
321 if not signer_pubkey_b64 or not signature_b64: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 raise SnapshotVerificationError("manifest missing signer_pubkey or signature")
324 body = _canonical_manifest_body(manifest)
326 trusted = _trusted_pubkeys(
327 trusted_keys_path,
328 db_path,
329 self_attesting_pubkey=signer_pubkey_b64 if trusted_keys_path is None else None,
330 )
332 sig_bytes = _b64url_decode(signature_b64)
333 verified = False
334 for pub_key in trusted:
335 try:
336 pub_key.verify(sig_bytes, body)
337 verified = True
338 break
339 except InvalidSignature as exc:
340 logger.debug("snapshot manifest signature did not match a trusted key: %s", exc)
341 continue
343 if not verified:
344 raise SnapshotVerificationError(
345 "manifest signature is invalid — snapshot may have been tampered with. "
346 "Pass --force-unverified to restore anyway (NOT recommended)."
347 )
350def snapshot_restore(
351 tarball_path: Path,
352 db_path: str,
353 trusted_keys_path: Path | None = None,
354 force_unverified: bool = False,
355) -> None:
356 """Restore a snapshot, verifying signature and artifact hashes.
358 Args:
359 tarball_path: Path to a .tar.gz produced by snapshot_create.
360 db_path: Destination database path. **Existing data is overwritten.**
361 trusted_keys_path: JSON file listing trusted base64url public keys.
362 When None only the local node's own key is trusted.
363 force_unverified: If True, skip signature/hash verification and
364 restore anyway. A loud warning is logged regardless.
366 Raises:
367 SnapshotVerificationError: On tampered input (unless force_unverified).
368 """
369 if force_unverified:
370 logger.warning(
371 "SECURITY WARNING: restoring unverified snapshot from %s — "
372 "--force-unverified was passed; integrity checks are DISABLED",
373 tarball_path,
374 )
376 with tempfile.TemporaryDirectory(prefix="stigmem-restore-") as tmpdir:
377 tmp = Path(tmpdir)
379 # -- 1. Extract ------------------------------------------------------
380 # filter='data' (PEP 706, Python 3.11.4+) rejects absolute paths,
381 # `..` traversal, symlinks pointing outside the destination, device
382 # files, setuid/setgid bits, etc. — closes the path-traversal hole
383 # that bare extractall() leaves open even with a controlled tmpdir.
384 with tarfile.open(tarball_path, "r:gz") as tf:
385 tf.extractall(tmp, filter="data")
387 manifest = _load_manifest(tmp / "manifest.json")
389 if not force_unverified:
390 # -- 2. Verify artifact hashes -----------------------------------
391 _verify_artifact_hashes(manifest, tmp)
393 # -- 3. Verify Ed25519 signature ---------------------------------
394 _verify_manifest_signature(manifest, db_path, trusted_keys_path)
396 logger.info("snapshot verified: signature OK, all artifact hashes match")
397 else:
398 logger.warning(
399 "SECURITY WARNING: artifact hash and signature verification SKIPPED for %s",
400 tarball_path,
401 )
403 # -- 4. Restore database --------------------------------------------
404 db_artifact = tmp / _DB_ARTIFACT
405 if not db_artifact.exists(): 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 raise SnapshotVerificationError(
407 f"database artifact {_DB_ARTIFACT!r} not found in snapshot"
408 )
409 shutil.copy2(str(db_artifact), db_path)
410 logger.info("snapshot restored to %s", db_path)