Coverage for node / src / stigmem_node / fact_chain.py: 88%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""ADR-016 L4 local hash chain for fact insert sequence integrity.""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6import json 

7import uuid 

8from dataclasses import dataclass 

9from datetime import UTC, datetime, timedelta 

10from typing import Any 

11 

12_HASH_PREFIX = "sha256:" 

13_CHECKPOINT_PAYLOAD_KIND = "stigmem.fact_chain_checkpoint" 

14_CHECKPOINT_PAYLOAD_VERSION = 1 

15 

16 

17@dataclass(frozen=True) 

18class FactChainVerification: 

19 """Result of verifying a tenant's local fact chain.""" 

20 

21 valid: bool 

22 checked_entries: int 

23 mismatch_reason: str | None = None 

24 fact_id: str | None = None 

25 chain_seq: int | None = None 

26 

27 

28class FactChainIntegrityError(ValueError): 

29 """Raised when a fact chain cannot be verified.""" 

30 

31 def __init__(self, result: FactChainVerification) -> None: 

32 super().__init__(result.mismatch_reason or "fact_chain_invalid") 

33 self.result = result 

34 

35 

36def _sha256_json(body: dict[str, Any]) -> str: 

37 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False) 

38 return f"{_HASH_PREFIX}{hashlib.sha256(canonical.encode('utf-8')).hexdigest()}" 

39 

40 

41def compute_fact_chain_event_hash(row: Any) -> str: 

42 """Hash the fact fields that define the local insert event.""" 

43 return _sha256_json( 

44 { 

45 "cid": row["cid"], 

46 "confidence": float(row["confidence"]), 

47 "entity": row["entity"], 

48 "fact_id": row["id"], 

49 "relation": row["relation"], 

50 "scope": row["scope"], 

51 "source": row["source"], 

52 "tenant_id": row["tenant_id"], 

53 "value_type": row["value_type"], 

54 "value_v": row["value_v"] or "", 

55 } 

56 ) 

57 

58 

59def compute_fact_chain_hash( 

60 *, 

61 tenant_id: str, 

62 chain_seq: int, 

63 fact_id: str, 

64 event_hash: str, 

65 previous_hash: str | None, 

66 created_at: str, 

67) -> str: 

68 """Hash one chain link from its event hash and predecessor.""" 

69 return _sha256_json( 

70 { 

71 "chain_seq": chain_seq, 

72 "created_at": created_at, 

73 "event_hash": event_hash, 

74 "fact_id": fact_id, 

75 "previous_hash": previous_hash, 

76 "tenant_id": tenant_id, 

77 } 

78 ) 

79 

80 

81def append_fact_chain_entry(conn: Any, row: Any) -> None: 

82 """Append a local L4 chain entry for a newly inserted fact row.""" 

83 tenant_id = row["tenant_id"] 

84 existing = conn.execute( 

85 "SELECT 1 FROM fact_chain WHERE tenant_id = ? AND fact_id = ?", 

86 (tenant_id, row["id"]), 

87 ).fetchone() 

88 if existing is not None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return 

90 

91 previous = conn.execute( 

92 "SELECT chain_seq, chain_hash FROM fact_chain " 

93 "WHERE tenant_id = ? ORDER BY chain_seq DESC LIMIT 1", 

94 (tenant_id,), 

95 ).fetchone() 

96 chain_seq = 1 if previous is None else int(previous["chain_seq"]) + 1 

97 previous_hash = None if previous is None else previous["chain_hash"] 

98 event_hash = compute_fact_chain_event_hash(row) 

99 created_at = datetime.now(UTC).isoformat() 

100 chain_hash = compute_fact_chain_hash( 

101 tenant_id=tenant_id, 

102 chain_seq=chain_seq, 

103 fact_id=row["id"], 

104 event_hash=event_hash, 

105 previous_hash=previous_hash, 

106 created_at=created_at, 

107 ) 

108 conn.execute( 

109 """ 

110 INSERT INTO fact_chain 

111 (id, tenant_id, chain_seq, fact_id, event_hash, previous_hash, chain_hash, created_at) 

112 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 

113 """, 

114 ( 

115 f"chain_{uuid.uuid4().hex}", 

116 tenant_id, 

117 chain_seq, 

118 row["id"], 

119 event_hash, 

120 previous_hash, 

121 chain_hash, 

122 created_at, 

123 ), 

124 ) 

125 submit_due_fact_chain_checkpoints(conn, tenant_id=tenant_id) 

126 

127 

128def verify_fact_chain(conn: Any, tenant_id: str = "default") -> FactChainVerification: 

129 """Verify chain sequence, fact event hashes, and link hashes for one tenant.""" 

130 rows = conn.execute( 

131 """ 

132 SELECT 

133 fc.*, 

134 f.id AS stored_fact_id, 

135 f.entity, 

136 f.relation, 

137 f.value_type, 

138 f.value_v, 

139 f.source, 

140 f.confidence, 

141 f.scope, 

142 f.tenant_id AS fact_tenant_id, 

143 f.cid 

144 FROM fact_chain fc 

145 LEFT JOIN facts f ON f.id = fc.fact_id AND f.tenant_id = fc.tenant_id 

146 WHERE fc.tenant_id = ? 

147 ORDER BY fc.chain_seq ASC 

148 """, 

149 (tenant_id,), 

150 ).fetchall() 

151 

152 previous_hash: str | None = None 

153 expected_seq = 1 

154 for row in rows: 

155 chain_seq = int(row["chain_seq"]) 

156 if chain_seq != expected_seq: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 return FactChainVerification( 

158 valid=False, 

159 checked_entries=expected_seq - 1, 

160 mismatch_reason="chain_seq_gap", 

161 fact_id=row["fact_id"], 

162 chain_seq=chain_seq, 

163 ) 

164 if row["previous_hash"] != previous_hash: 

165 return FactChainVerification( 

166 valid=False, 

167 checked_entries=expected_seq - 1, 

168 mismatch_reason="previous_hash_mismatch", 

169 fact_id=row["fact_id"], 

170 chain_seq=chain_seq, 

171 ) 

172 if row["stored_fact_id"] is None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 return FactChainVerification( 

174 valid=False, 

175 checked_entries=expected_seq - 1, 

176 mismatch_reason="fact_missing", 

177 fact_id=row["fact_id"], 

178 chain_seq=chain_seq, 

179 ) 

180 

181 fact_row = { 

182 "cid": row["cid"], 

183 "confidence": row["confidence"], 

184 "entity": row["entity"], 

185 "id": row["fact_id"], 

186 "relation": row["relation"], 

187 "scope": row["scope"], 

188 "source": row["source"], 

189 "tenant_id": row["fact_tenant_id"], 

190 "value_type": row["value_type"], 

191 "value_v": row["value_v"], 

192 } 

193 expected_event_hash = compute_fact_chain_event_hash(fact_row) 

194 if row["event_hash"] != expected_event_hash: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 return FactChainVerification( 

196 valid=False, 

197 checked_entries=expected_seq - 1, 

198 mismatch_reason="event_hash_mismatch", 

199 fact_id=row["fact_id"], 

200 chain_seq=chain_seq, 

201 ) 

202 

203 expected_chain_hash = compute_fact_chain_hash( 

204 tenant_id=row["tenant_id"], 

205 chain_seq=chain_seq, 

206 fact_id=row["fact_id"], 

207 event_hash=row["event_hash"], 

208 previous_hash=row["previous_hash"], 

209 created_at=row["created_at"], 

210 ) 

211 if row["chain_hash"] != expected_chain_hash: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return FactChainVerification( 

213 valid=False, 

214 checked_entries=expected_seq - 1, 

215 mismatch_reason="chain_hash_mismatch", 

216 fact_id=row["fact_id"], 

217 chain_seq=chain_seq, 

218 ) 

219 

220 previous_hash = row["chain_hash"] 

221 expected_seq += 1 

222 

223 return FactChainVerification(valid=True, checked_entries=len(rows)) 

224 

225 

226def build_fact_chain_proof(conn: Any, tenant_id: str = "default") -> dict[str, Any]: 

227 """Return a compact proof over the current tenant-local chain head.""" 

228 verification = verify_fact_chain(conn, tenant_id=tenant_id) 

229 if not verification.valid: 

230 raise FactChainIntegrityError(verification) 

231 head = conn.execute( 

232 "SELECT chain_hash FROM fact_chain WHERE tenant_id = ? ORDER BY chain_seq DESC LIMIT 1", 

233 (tenant_id,), 

234 ).fetchone() 

235 return { 

236 "tenant_id": tenant_id, 

237 "checked_entries": verification.checked_entries, 

238 "head_hash": None if head is None else head["chain_hash"], 

239 "checkpoint": latest_fact_chain_checkpoint(conn, tenant_id=tenant_id), 

240 } 

241 

242 

243def _json_dumps(body: dict[str, Any]) -> str: 

244 return json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False) 

245 

246 

247def _parse_iso(value: str | None) -> datetime | None: 

248 if not value: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 return None 

250 parsed = datetime.fromisoformat(value) 

251 if parsed.tzinfo is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return parsed.replace(tzinfo=UTC) 

253 return parsed 

254 

255 

256def _row_to_checkpoint(row: Any) -> dict[str, Any]: 

257 return { 

258 "id": row["id"], 

259 "tenant_id": row["tenant_id"], 

260 "covered_chain_seq": int(row["covered_chain_seq"]), 

261 "chain_hash": row["chain_hash"], 

262 "status": row["status"], 

263 "attempt_count": int(row["attempt_count"]), 

264 "created_at": row["created_at"], 

265 "submitted_at": row["submitted_at"], 

266 "last_error": row["last_error"], 

267 "tl_backend": row["tl_backend"], 

268 "tl_log_id": row["tl_log_id"], 

269 "tl_leaf_hash": row["tl_leaf_hash"], 

270 "tl_log_index": row["tl_log_index"], 

271 "tl_integrated_time": row["tl_integrated_time"], 

272 "tl_inclusion_proof": json.loads(row["tl_inclusion_proof"] or "{}"), 

273 "tl_raw": json.loads(row["tl_raw"] or "{}"), 

274 } 

275 

276 

277def _checkpoint_payload(row: Any, created_at: str) -> dict[str, Any]: 

278 return { 

279 "kind": _CHECKPOINT_PAYLOAD_KIND, 

280 "version": _CHECKPOINT_PAYLOAD_VERSION, 

281 "tenant_id": row["tenant_id"], 

282 "covered_chain_seq": int(row["chain_seq"]), 

283 "chain_hash": row["chain_hash"], 

284 "created_at": created_at, 

285 } 

286 

287 

288def _latest_checkpoint_row(conn: Any, tenant_id: str) -> Any | None: 

289 return conn.execute( 

290 """ 

291 SELECT * 

292 FROM fact_chain_checkpoints 

293 WHERE tenant_id = ? 

294 ORDER BY covered_chain_seq DESC 

295 LIMIT 1 

296 """, 

297 (tenant_id,), 

298 ).fetchone() 

299 

300 

301def latest_fact_chain_checkpoint(conn: Any, tenant_id: str = "default") -> dict[str, Any] | None: 

302 """Return the latest local L5 checkpoint metadata for a tenant.""" 

303 row = _latest_checkpoint_row(conn, tenant_id) 

304 return None if row is None else _row_to_checkpoint(row) 

305 

306 

307def _next_pending_checkpoint(conn: Any, tenant_id: str, now: datetime) -> Any | None: 

308 return conn.execute( 

309 """ 

310 SELECT * 

311 FROM fact_chain_checkpoints 

312 WHERE tenant_id = ? 

313 AND status = 'pending' 

314 AND (next_retry_at IS NULL OR next_retry_at <= ?) 

315 ORDER BY covered_chain_seq ASC 

316 LIMIT 1 

317 """, 

318 (tenant_id, now.isoformat()), 

319 ).fetchone() 

320 

321 

322def _submit_checkpoint_row(conn: Any, row: Any, now: datetime) -> None: 

323 from .identity.transparency_log import make_transparency_log 

324 from .settings import settings 

325 

326 attempt_count = int(row["attempt_count"]) + 1 

327 try: 

328 entry = make_transparency_log().submit(json.loads(row["payload"])) 

329 except Exception as exc: 

330 retry_s = max(1, settings.fact_chain_checkpoint_retry_s) 

331 next_retry_at = (now + timedelta(seconds=retry_s)).isoformat() 

332 conn.execute( 

333 """ 

334 UPDATE fact_chain_checkpoints 

335 SET attempt_count = ?, 

336 next_retry_at = ?, 

337 last_error = ?, 

338 status = 'pending' 

339 WHERE id = ? 

340 """, 

341 (attempt_count, next_retry_at, str(exc), row["id"]), 

342 ) 

343 return 

344 

345 conn.execute( 

346 """ 

347 UPDATE fact_chain_checkpoints 

348 SET status = 'submitted', 

349 attempt_count = ?, 

350 submitted_at = ?, 

351 next_retry_at = NULL, 

352 last_error = NULL, 

353 tl_log_id = ?, 

354 tl_leaf_hash = ?, 

355 tl_log_index = ?, 

356 tl_integrated_time = ?, 

357 tl_inclusion_proof = ?, 

358 tl_raw = ? 

359 WHERE id = ? 

360 """, 

361 ( 

362 attempt_count, 

363 now.isoformat(), 

364 entry.log_id, 

365 entry.leaf_hash, 

366 entry.log_index, 

367 entry.integrated_time, 

368 _json_dumps(entry.inclusion_proof), 

369 _json_dumps(entry.raw), 

370 row["id"], 

371 ), 

372 ) 

373 

374 

375def _checkpoint_due(conn: Any, tenant_id: str, head: Any, now: datetime) -> bool: 

376 from .settings import settings 

377 

378 latest = _latest_checkpoint_row(conn, tenant_id) 

379 covered_seq = 0 if latest is None else int(latest["covered_chain_seq"]) 

380 head_seq = int(head["chain_seq"]) 

381 if head_seq <= covered_seq: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 return False 

383 checkpoint_interval = max(1, settings.fact_chain_checkpoint_interval) 

384 checkpoint_max_age_s = max(1, settings.fact_chain_checkpoint_max_age_s) 

385 if head_seq - covered_seq >= checkpoint_interval: 

386 return True 

387 

388 reference_time = ( 

389 _parse_iso(head["created_at"]) if latest is None else _parse_iso(latest["created_at"]) 

390 ) 

391 if reference_time is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 return False 

393 return now - reference_time >= timedelta(seconds=checkpoint_max_age_s) 

394 

395 

396def _create_checkpoint_for_head(conn: Any, tenant_id: str, head: Any, now: datetime) -> Any: 

397 from .settings import settings 

398 

399 created_at = now.isoformat() 

400 payload = _checkpoint_payload(head, created_at) 

401 checkpoint_id = f"chaincp_{uuid.uuid4().hex}" 

402 conn.execute( 

403 """ 

404 INSERT INTO fact_chain_checkpoints 

405 (id, tenant_id, covered_chain_seq, chain_hash, payload, created_at, tl_backend) 

406 VALUES (?, ?, ?, ?, ?, ?, ?) 

407 """, 

408 ( 

409 checkpoint_id, 

410 tenant_id, 

411 int(head["chain_seq"]), 

412 head["chain_hash"], 

413 _json_dumps(payload), 

414 created_at, 

415 settings.tl_backend, 

416 ), 

417 ) 

418 return conn.execute( 

419 "SELECT * FROM fact_chain_checkpoints WHERE id = ?", 

420 (checkpoint_id,), 

421 ).fetchone() 

422 

423 

424def submit_due_fact_chain_checkpoints(conn: Any, tenant_id: str = "default") -> None: 

425 """Create and submit due L5 checkpoints without blocking fact writes.""" 

426 now = datetime.now(UTC) 

427 pending = _next_pending_checkpoint(conn, tenant_id, now) 

428 if pending is not None: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 _submit_checkpoint_row(conn, pending, now) 

430 

431 head = conn.execute( 

432 """ 

433 SELECT tenant_id, chain_seq, chain_hash, created_at 

434 FROM fact_chain 

435 WHERE tenant_id = ? 

436 ORDER BY chain_seq DESC 

437 LIMIT 1 

438 """, 

439 (tenant_id,), 

440 ).fetchone() 

441 if head is None or not _checkpoint_due(conn, tenant_id, head, now): 

442 return 

443 checkpoint = _create_checkpoint_for_head(conn, tenant_id, head, now) 

444 _submit_checkpoint_row(conn, checkpoint, now)