Coverage for node / src / stigmem_node / identity / transparency_log.py: 55%
143 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Transparency-log adapters — spec §19.2.
3Public surface:
4 TransparencyLogUnavailable — raised when TL cannot be reached
5 LogEntry — dataclass for a TL entry
6 TransparencyLog — abstract base class
7 LocalAppendOnlyLog — file-backed hash-chain log (dev / single-org)
8 RekorLog — Sigstore Rekor adapter (requires [identity] extra)
9 make_transparency_log() — factory respecting STIGMEM_TL_BACKEND setting
11Security requirements (H2 mitigation):
12 - TransparencyLogUnavailable must be raised, not silenced, when TL is
13 unreachable. Callers in trust_mode=strict MUST treat this as HTTP 503.
14 - Checkpoint/STH verification goes through sigstore.transparency — not
15 hand-rolled Merkle logic.
16"""
18from __future__ import annotations
20import hashlib
21import json
22import logging
23import time
24from abc import ABC, abstractmethod
25from dataclasses import dataclass, field
26from pathlib import Path
27from typing import Any
29import canonicaljson
31logger = logging.getLogger("stigmem.identity.tl")
34class TransparencyLogUnavailable(RuntimeError):
35 """Raised when the configured transparency log cannot be reached."""
38@dataclass
39class LogEntry:
40 """A transparency-log inclusion record."""
42 log_id: str # opaque backend identifier (file path hash or Rekor tree ID)
43 leaf_hash: str # hex SHA-256 of the canonical leaf data
44 log_index: int # sequential index in the log
45 integrated_time: int # Unix epoch seconds
46 inclusion_proof: dict[str, Any] = field(default_factory=dict)
47 raw: dict[str, Any] = field(default_factory=dict) # full backend response
50class TransparencyLog(ABC):
51 """Abstract transparency-log adapter."""
53 @abstractmethod
54 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry:
55 """Submit a manifest to the transparency log. Returns a LogEntry on success.
57 Raises TransparencyLogUnavailable if the log backend is unreachable.
58 """
60 @abstractmethod
61 def verify_inclusion(self, log_entry: LogEntry) -> bool:
62 """Verify that *log_entry* is genuinely included in the log.
64 Returns True on success. Raises TransparencyLogUnavailable if backend
65 is unreachable and raises ValueError on cryptographic failure.
66 """
69# ---------------------------------------------------------------------------
70# LocalAppendOnlyLog — file-backed hash chain (dev / single-org)
71# ---------------------------------------------------------------------------
74class LocalAppendOnlyLog(TransparencyLog):
75 """Simple append-only log stored as newline-delimited JSON.
77 Each line is a JSON object:
78 { "index": int, "ts": int, "leaf_hash": str, "prev_hash": str,
79 "chain_hash": str, "payload": {...} }
81 chain_hash = SHA-256(prev_hash || leaf_hash) ties each entry to its
82 predecessor — sufficient for dev / audit use; not a full Merkle tree.
83 """
85 def __init__(self, path: str | Path) -> None:
86 self._path = Path(path)
87 self._log_id = hashlib.sha256(str(self._path.resolve()).encode()).hexdigest()[:16]
89 def _leaf_hash(self, payload: dict[str, Any]) -> str:
90 # RFC 8785 JCS — must match RekorLog and manifest signing bodies.
91 canonical = canonicaljson.encode_canonical_json(payload)
92 return hashlib.sha256(canonical).hexdigest()
94 def _last_entry(self) -> dict[str, Any] | None:
95 if not self._path.exists():
96 return None
97 lines = self._path.read_text().strip().splitlines()
98 if not lines: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 return None
100 entry: dict[str, Any] = json.loads(lines[-1])
101 return entry
103 def _chain_hash(self, prev_hash: str, leaf_hash: str) -> str:
104 combined = (prev_hash + leaf_hash).encode()
105 return hashlib.sha256(combined).hexdigest()
107 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry:
108 last = self._last_entry()
109 index = (last["index"] + 1) if last else 0
110 prev_hash = last["chain_hash"] if last else ("0" * 64)
111 leaf_hash = self._leaf_hash(manifest_dict)
112 chain_hash = self._chain_hash(prev_hash, leaf_hash)
113 ts = int(time.time())
115 entry: dict[str, Any] = {
116 "index": index,
117 "ts": ts,
118 "leaf_hash": leaf_hash,
119 "prev_hash": prev_hash,
120 "chain_hash": chain_hash,
121 "payload": manifest_dict,
122 }
124 with self._path.open("a") as fh:
125 fh.write(json.dumps(entry, separators=(",", ":")) + "\n")
127 return LogEntry(
128 log_id=self._log_id,
129 leaf_hash=leaf_hash,
130 log_index=index,
131 integrated_time=ts,
132 inclusion_proof={"chain_hash": chain_hash, "prev_hash": prev_hash},
133 raw=entry,
134 )
136 def verify_inclusion(self, log_entry: LogEntry) -> bool:
137 if not self._path.exists(): 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise TransparencyLogUnavailable("local TL file not found")
140 target_index = log_entry.log_index
141 lines = self._path.read_text().strip().splitlines()
143 if target_index >= len(lines): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise ValueError(
145 f"log_index {target_index} out of range (log has {len(lines)} entries)"
146 )
148 stored = json.loads(lines[target_index])
149 if stored["leaf_hash"] != log_entry.leaf_hash:
150 raise ValueError(
151 f"leaf_hash mismatch at index {target_index}: "
152 f"stored={stored['leaf_hash']!r}, expected={log_entry.leaf_hash!r}"
153 )
154 if stored["index"] != target_index: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 raise ValueError("stored index does not match log_entry.log_index")
157 # Recompute chain_hash to verify integrity back to prev
158 recomputed = self._chain_hash(stored["prev_hash"], stored["leaf_hash"])
159 if recomputed != stored["chain_hash"]: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 raise ValueError("chain_hash integrity check failed")
162 return True
165# ---------------------------------------------------------------------------
166# RekorLog — Sigstore Rekor adapter
167# ---------------------------------------------------------------------------
170class RekorLog(TransparencyLog):
171 """Transparency-log adapter backed by a Sigstore Rekor instance.
173 Requires `sigstore>=3.0` (the [identity] optional extra).
174 STH / inclusion-proof verification goes through sigstore.transparency —
175 not hand-rolled Merkle logic (H2 security requirement).
176 """
178 def __init__(self, rekor_url: str = "https://rekor.sigstore.dev") -> None:
179 self._url = rekor_url.rstrip("/")
180 try:
181 import sigstore # noqa: F401 — validate import at construction
182 except ImportError as exc:
183 raise ImportError(
184 "sigstore package is required for RekorLog; "
185 "install it with: pip install 'stigmem-node[identity]'"
186 ) from exc
188 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry:
189 try:
190 import httpx
192 # RFC 8785 JCS — consistent with LocalAppendOnlyLog._leaf_hash.
193 canonical = canonicaljson.encode_canonical_json(manifest_dict)
194 leaf_hash = hashlib.sha256(canonical).hexdigest()
196 # Rekor accepts intoto / hashedrekord entries; we submit as hashedrekord v0.0.1
197 entry_body = {
198 "kind": "hashedrekord",
199 "apiVersion": "0.0.1",
200 "spec": {
201 "data": {
202 "hash": {
203 "algorithm": "sha256",
204 "value": leaf_hash,
205 }
206 },
207 "signature": {
208 # The manifest's own signature is the attestation
209 "content": manifest_dict.get("signature", ""),
210 "publicKey": {"content": manifest_dict.get("public_key", "")},
211 },
212 },
213 }
215 resp = httpx.post(
216 f"{self._url}/api/v1/log/entries",
217 json={"entry": entry_body},
218 timeout=15.0,
219 )
220 if resp.status_code not in (200, 201):
221 raise TransparencyLogUnavailable(
222 f"Rekor returned HTTP {resp.status_code}: {resp.text[:200]}"
223 )
225 data = resp.json()
226 # Rekor response is { <uuid>: { body, integratedTime, logID, logIndex, verification } }
227 uuid_key = next(iter(data))
228 entry = data[uuid_key]
229 log_index = entry.get("logIndex", -1)
230 integrated_time = entry.get("integratedTime", int(time.time()))
231 tree_id = entry.get("logID", "")
233 return LogEntry(
234 log_id=tree_id,
235 leaf_hash=leaf_hash,
236 log_index=int(log_index),
237 integrated_time=int(integrated_time),
238 inclusion_proof=entry.get("verification", {}),
239 raw=entry,
240 )
242 except TransparencyLogUnavailable:
243 raise
244 except Exception as exc:
245 raise TransparencyLogUnavailable(f"Rekor submission failed: {exc}") from exc
247 def verify_inclusion(self, log_entry: LogEntry) -> bool:
248 """Verify inclusion via Rekor's own verification endpoint.
250 Uses sigstore.transparency for STH/checkpoint verification — not
251 hand-rolled Merkle code.
252 """
253 try:
254 import httpx
256 resp = httpx.get(
257 f"{self._url}/api/v1/log/entries",
258 params={"logIndex": log_entry.log_index},
259 timeout=15.0,
260 )
261 if resp.status_code == 404:
262 raise ValueError(f"log_index {log_entry.log_index} not found in Rekor")
263 if resp.status_code != 200:
264 raise TransparencyLogUnavailable(
265 f"Rekor returned HTTP {resp.status_code} during verification"
266 )
268 data = resp.json()
269 uuid_key = next(iter(data))
270 stored = data[uuid_key]
272 # Verify leaf hash matches what we stored
273 stored_body = stored.get("body", "")
274 import base64
276 try:
277 decoded = json.loads(base64.b64decode(stored_body + "=="))
278 stored_hash = (
279 decoded.get("spec", {}).get("data", {}).get("hash", {}).get("value", "")
280 )
281 except (ValueError, TypeError) as exc:
282 logger.warning(
283 "stored Rekor body for UUID %s could not be decoded: %s",
284 uuid_key,
285 exc,
286 )
287 stored_hash = ""
289 if stored_hash and stored_hash != log_entry.leaf_hash:
290 raise ValueError(
291 f"leaf_hash mismatch: stored={stored_hash!r}, expected={log_entry.leaf_hash!r}"
292 )
294 # Delegate checkpoint/STH verification to sigstore.transparency.
295 # ImportError (sigstore not installed) is a warned skip; any other failure
296 # means the log checkpoint cannot be trusted and is a hard error.
297 try:
298 from sigstore.transparency import LogEntry as SigstoreLogEntry
300 _ = SigstoreLogEntry.from_response(data)
301 except ImportError as exc:
302 logger.warning(
303 "sigstore not installed; STH checkpoint verification skipped: %s", exc
304 )
305 except Exception as exc:
306 raise ValueError(f"Rekor STH checkpoint verification failed: {exc}") from exc
308 return True
310 except (TransparencyLogUnavailable, ValueError):
311 raise
312 except Exception as exc:
313 raise TransparencyLogUnavailable(f"Rekor verification failed: {exc}") from exc
316# ---------------------------------------------------------------------------
317# Factory
318# ---------------------------------------------------------------------------
321def make_transparency_log() -> TransparencyLog:
322 """Return a TransparencyLog instance per STIGMEM_TL_BACKEND setting."""
323 from ..settings import settings
325 backend = settings.tl_backend
326 if backend == "rekor": 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 return RekorLog(rekor_url=settings.tl_rekor_url)
328 if backend == "local":
329 return LocalAppendOnlyLog(path=settings.tl_local_path)
330 # "off" — return a no-op log that always raises TransparencyLogUnavailable
331 return _OffLog()
334class _OffLog(TransparencyLog):
335 """Sentinel: TL disabled. Raises TransparencyLogUnavailable on every call."""
337 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry:
338 raise TransparencyLogUnavailable("transparency log is disabled (STIGMEM_TL_BACKEND=off)")
340 def verify_inclusion(self, log_entry: LogEntry) -> bool:
341 raise TransparencyLogUnavailable("transparency log is disabled (STIGMEM_TL_BACKEND=off)")