Coverage for node / src / stigmem_node / identity / transparency_log.py: 55%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Transparency-log adapters — spec §19.2. 

2 

3Public surface: 

4 TransparencyLogUnavailable — raised when TL cannot be reached 

5 LogEntry — dataclass for a TL entry 

6 TransparencyLog — abstract base class 

7 LocalAppendOnlyLog — file-backed hash-chain log (dev / single-org) 

8 RekorLog — Sigstore Rekor adapter (requires [identity] extra) 

9 make_transparency_log() — factory respecting STIGMEM_TL_BACKEND setting 

10 

11Security requirements (H2 mitigation): 

12 - TransparencyLogUnavailable must be raised, not silenced, when TL is 

13 unreachable. Callers in trust_mode=strict MUST treat this as HTTP 503. 

14 - Checkpoint/STH verification goes through sigstore.transparency — not 

15 hand-rolled Merkle logic. 

16""" 

17 

18from __future__ import annotations 

19 

20import hashlib 

21import json 

22import logging 

23import time 

24from abc import ABC, abstractmethod 

25from dataclasses import dataclass, field 

26from pathlib import Path 

27from typing import Any 

28 

29import canonicaljson 

30 

31logger = logging.getLogger("stigmem.identity.tl") 

32 

33 

34class TransparencyLogUnavailable(RuntimeError): 

35 """Raised when the configured transparency log cannot be reached.""" 

36 

37 

38@dataclass 

39class LogEntry: 

40 """A transparency-log inclusion record.""" 

41 

42 log_id: str # opaque backend identifier (file path hash or Rekor tree ID) 

43 leaf_hash: str # hex SHA-256 of the canonical leaf data 

44 log_index: int # sequential index in the log 

45 integrated_time: int # Unix epoch seconds 

46 inclusion_proof: dict[str, Any] = field(default_factory=dict) 

47 raw: dict[str, Any] = field(default_factory=dict) # full backend response 

48 

49 

50class TransparencyLog(ABC): 

51 """Abstract transparency-log adapter.""" 

52 

53 @abstractmethod 

54 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry: 

55 """Submit a manifest to the transparency log. Returns a LogEntry on success. 

56 

57 Raises TransparencyLogUnavailable if the log backend is unreachable. 

58 """ 

59 

60 @abstractmethod 

61 def verify_inclusion(self, log_entry: LogEntry) -> bool: 

62 """Verify that *log_entry* is genuinely included in the log. 

63 

64 Returns True on success. Raises TransparencyLogUnavailable if backend 

65 is unreachable and raises ValueError on cryptographic failure. 

66 """ 

67 

68 

69# --------------------------------------------------------------------------- 

70# LocalAppendOnlyLog — file-backed hash chain (dev / single-org) 

71# --------------------------------------------------------------------------- 

72 

73 

74class LocalAppendOnlyLog(TransparencyLog): 

75 """Simple append-only log stored as newline-delimited JSON. 

76 

77 Each line is a JSON object: 

78 { "index": int, "ts": int, "leaf_hash": str, "prev_hash": str, 

79 "chain_hash": str, "payload": {...} } 

80 

81 chain_hash = SHA-256(prev_hash || leaf_hash) ties each entry to its 

82 predecessor — sufficient for dev / audit use; not a full Merkle tree. 

83 """ 

84 

85 def __init__(self, path: str | Path) -> None: 

86 self._path = Path(path) 

87 self._log_id = hashlib.sha256(str(self._path.resolve()).encode()).hexdigest()[:16] 

88 

89 def _leaf_hash(self, payload: dict[str, Any]) -> str: 

90 # RFC 8785 JCS — must match RekorLog and manifest signing bodies. 

91 canonical = canonicaljson.encode_canonical_json(payload) 

92 return hashlib.sha256(canonical).hexdigest() 

93 

94 def _last_entry(self) -> dict[str, Any] | None: 

95 if not self._path.exists(): 

96 return None 

97 lines = self._path.read_text().strip().splitlines() 

98 if not lines: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 return None 

100 entry: dict[str, Any] = json.loads(lines[-1]) 

101 return entry 

102 

103 def _chain_hash(self, prev_hash: str, leaf_hash: str) -> str: 

104 combined = (prev_hash + leaf_hash).encode() 

105 return hashlib.sha256(combined).hexdigest() 

106 

107 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry: 

108 last = self._last_entry() 

109 index = (last["index"] + 1) if last else 0 

110 prev_hash = last["chain_hash"] if last else ("0" * 64) 

111 leaf_hash = self._leaf_hash(manifest_dict) 

112 chain_hash = self._chain_hash(prev_hash, leaf_hash) 

113 ts = int(time.time()) 

114 

115 entry: dict[str, Any] = { 

116 "index": index, 

117 "ts": ts, 

118 "leaf_hash": leaf_hash, 

119 "prev_hash": prev_hash, 

120 "chain_hash": chain_hash, 

121 "payload": manifest_dict, 

122 } 

123 

124 with self._path.open("a") as fh: 

125 fh.write(json.dumps(entry, separators=(",", ":")) + "\n") 

126 

127 return LogEntry( 

128 log_id=self._log_id, 

129 leaf_hash=leaf_hash, 

130 log_index=index, 

131 integrated_time=ts, 

132 inclusion_proof={"chain_hash": chain_hash, "prev_hash": prev_hash}, 

133 raw=entry, 

134 ) 

135 

136 def verify_inclusion(self, log_entry: LogEntry) -> bool: 

137 if not self._path.exists(): 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 raise TransparencyLogUnavailable("local TL file not found") 

139 

140 target_index = log_entry.log_index 

141 lines = self._path.read_text().strip().splitlines() 

142 

143 if target_index >= len(lines): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 raise ValueError( 

145 f"log_index {target_index} out of range (log has {len(lines)} entries)" 

146 ) 

147 

148 stored = json.loads(lines[target_index]) 

149 if stored["leaf_hash"] != log_entry.leaf_hash: 

150 raise ValueError( 

151 f"leaf_hash mismatch at index {target_index}: " 

152 f"stored={stored['leaf_hash']!r}, expected={log_entry.leaf_hash!r}" 

153 ) 

154 if stored["index"] != target_index: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 raise ValueError("stored index does not match log_entry.log_index") 

156 

157 # Recompute chain_hash to verify integrity back to prev 

158 recomputed = self._chain_hash(stored["prev_hash"], stored["leaf_hash"]) 

159 if recomputed != stored["chain_hash"]: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 raise ValueError("chain_hash integrity check failed") 

161 

162 return True 

163 

164 

165# --------------------------------------------------------------------------- 

166# RekorLog — Sigstore Rekor adapter 

167# --------------------------------------------------------------------------- 

168 

169 

170class RekorLog(TransparencyLog): 

171 """Transparency-log adapter backed by a Sigstore Rekor instance. 

172 

173 Requires `sigstore>=3.0` (the [identity] optional extra). 

174 STH / inclusion-proof verification goes through sigstore.transparency — 

175 not hand-rolled Merkle logic (H2 security requirement). 

176 """ 

177 

178 def __init__(self, rekor_url: str = "https://rekor.sigstore.dev") -> None: 

179 self._url = rekor_url.rstrip("/") 

180 try: 

181 import sigstore # noqa: F401 — validate import at construction 

182 except ImportError as exc: 

183 raise ImportError( 

184 "sigstore package is required for RekorLog; " 

185 "install it with: pip install 'stigmem-node[identity]'" 

186 ) from exc 

187 

188 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry: 

189 try: 

190 import httpx 

191 

192 # RFC 8785 JCS — consistent with LocalAppendOnlyLog._leaf_hash. 

193 canonical = canonicaljson.encode_canonical_json(manifest_dict) 

194 leaf_hash = hashlib.sha256(canonical).hexdigest() 

195 

196 # Rekor accepts intoto / hashedrekord entries; we submit as hashedrekord v0.0.1 

197 entry_body = { 

198 "kind": "hashedrekord", 

199 "apiVersion": "0.0.1", 

200 "spec": { 

201 "data": { 

202 "hash": { 

203 "algorithm": "sha256", 

204 "value": leaf_hash, 

205 } 

206 }, 

207 "signature": { 

208 # The manifest's own signature is the attestation 

209 "content": manifest_dict.get("signature", ""), 

210 "publicKey": {"content": manifest_dict.get("public_key", "")}, 

211 }, 

212 }, 

213 } 

214 

215 resp = httpx.post( 

216 f"{self._url}/api/v1/log/entries", 

217 json={"entry": entry_body}, 

218 timeout=15.0, 

219 ) 

220 if resp.status_code not in (200, 201): 

221 raise TransparencyLogUnavailable( 

222 f"Rekor returned HTTP {resp.status_code}: {resp.text[:200]}" 

223 ) 

224 

225 data = resp.json() 

226 # Rekor response is { <uuid>: { body, integratedTime, logID, logIndex, verification } } 

227 uuid_key = next(iter(data)) 

228 entry = data[uuid_key] 

229 log_index = entry.get("logIndex", -1) 

230 integrated_time = entry.get("integratedTime", int(time.time())) 

231 tree_id = entry.get("logID", "") 

232 

233 return LogEntry( 

234 log_id=tree_id, 

235 leaf_hash=leaf_hash, 

236 log_index=int(log_index), 

237 integrated_time=int(integrated_time), 

238 inclusion_proof=entry.get("verification", {}), 

239 raw=entry, 

240 ) 

241 

242 except TransparencyLogUnavailable: 

243 raise 

244 except Exception as exc: 

245 raise TransparencyLogUnavailable(f"Rekor submission failed: {exc}") from exc 

246 

247 def verify_inclusion(self, log_entry: LogEntry) -> bool: 

248 """Verify inclusion via Rekor's own verification endpoint. 

249 

250 Uses sigstore.transparency for STH/checkpoint verification — not 

251 hand-rolled Merkle code. 

252 """ 

253 try: 

254 import httpx 

255 

256 resp = httpx.get( 

257 f"{self._url}/api/v1/log/entries", 

258 params={"logIndex": log_entry.log_index}, 

259 timeout=15.0, 

260 ) 

261 if resp.status_code == 404: 

262 raise ValueError(f"log_index {log_entry.log_index} not found in Rekor") 

263 if resp.status_code != 200: 

264 raise TransparencyLogUnavailable( 

265 f"Rekor returned HTTP {resp.status_code} during verification" 

266 ) 

267 

268 data = resp.json() 

269 uuid_key = next(iter(data)) 

270 stored = data[uuid_key] 

271 

272 # Verify leaf hash matches what we stored 

273 stored_body = stored.get("body", "") 

274 import base64 

275 

276 try: 

277 decoded = json.loads(base64.b64decode(stored_body + "==")) 

278 stored_hash = ( 

279 decoded.get("spec", {}).get("data", {}).get("hash", {}).get("value", "") 

280 ) 

281 except (ValueError, TypeError) as exc: 

282 logger.warning( 

283 "stored Rekor body for UUID %s could not be decoded: %s", 

284 uuid_key, 

285 exc, 

286 ) 

287 stored_hash = "" 

288 

289 if stored_hash and stored_hash != log_entry.leaf_hash: 

290 raise ValueError( 

291 f"leaf_hash mismatch: stored={stored_hash!r}, expected={log_entry.leaf_hash!r}" 

292 ) 

293 

294 # Delegate checkpoint/STH verification to sigstore.transparency. 

295 # ImportError (sigstore not installed) is a warned skip; any other failure 

296 # means the log checkpoint cannot be trusted and is a hard error. 

297 try: 

298 from sigstore.transparency import LogEntry as SigstoreLogEntry 

299 

300 _ = SigstoreLogEntry.from_response(data) 

301 except ImportError as exc: 

302 logger.warning( 

303 "sigstore not installed; STH checkpoint verification skipped: %s", exc 

304 ) 

305 except Exception as exc: 

306 raise ValueError(f"Rekor STH checkpoint verification failed: {exc}") from exc 

307 

308 return True 

309 

310 except (TransparencyLogUnavailable, ValueError): 

311 raise 

312 except Exception as exc: 

313 raise TransparencyLogUnavailable(f"Rekor verification failed: {exc}") from exc 

314 

315 

316# --------------------------------------------------------------------------- 

317# Factory 

318# --------------------------------------------------------------------------- 

319 

320 

321def make_transparency_log() -> TransparencyLog: 

322 """Return a TransparencyLog instance per STIGMEM_TL_BACKEND setting.""" 

323 from ..settings import settings 

324 

325 backend = settings.tl_backend 

326 if backend == "rekor": 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 return RekorLog(rekor_url=settings.tl_rekor_url) 

328 if backend == "local": 

329 return LocalAppendOnlyLog(path=settings.tl_local_path) 

330 # "off" — return a no-op log that always raises TransparencyLogUnavailable 

331 return _OffLog() 

332 

333 

334class _OffLog(TransparencyLog): 

335 """Sentinel: TL disabled. Raises TransparencyLogUnavailable on every call.""" 

336 

337 def submit(self, manifest_dict: dict[str, Any]) -> LogEntry: 

338 raise TransparencyLogUnavailable("transparency log is disabled (STIGMEM_TL_BACKEND=off)") 

339 

340 def verify_inclusion(self, log_entry: LogEntry) -> bool: 

341 raise TransparencyLogUnavailable("transparency log is disabled (STIGMEM_TL_BACKEND=off)")