Coverage for node / src / stigmem_node / fact_chain.py: 88%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""ADR-016 L4 local hash chain for fact insert sequence integrity."""
3from __future__ import annotations
5import hashlib
6import json
7import uuid
8from dataclasses import dataclass
9from datetime import UTC, datetime, timedelta
10from typing import Any
12_HASH_PREFIX = "sha256:"
13_CHECKPOINT_PAYLOAD_KIND = "stigmem.fact_chain_checkpoint"
14_CHECKPOINT_PAYLOAD_VERSION = 1
17@dataclass(frozen=True)
18class FactChainVerification:
19 """Result of verifying a tenant's local fact chain."""
21 valid: bool
22 checked_entries: int
23 mismatch_reason: str | None = None
24 fact_id: str | None = None
25 chain_seq: int | None = None
28class FactChainIntegrityError(ValueError):
29 """Raised when a fact chain cannot be verified."""
31 def __init__(self, result: FactChainVerification) -> None:
32 super().__init__(result.mismatch_reason or "fact_chain_invalid")
33 self.result = result
36def _sha256_json(body: dict[str, Any]) -> str:
37 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
38 return f"{_HASH_PREFIX}{hashlib.sha256(canonical.encode('utf-8')).hexdigest()}"
41def compute_fact_chain_event_hash(row: Any) -> str:
42 """Hash the fact fields that define the local insert event."""
43 return _sha256_json(
44 {
45 "cid": row["cid"],
46 "confidence": float(row["confidence"]),
47 "entity": row["entity"],
48 "fact_id": row["id"],
49 "relation": row["relation"],
50 "scope": row["scope"],
51 "source": row["source"],
52 "tenant_id": row["tenant_id"],
53 "value_type": row["value_type"],
54 "value_v": row["value_v"] or "",
55 }
56 )
59def compute_fact_chain_hash(
60 *,
61 tenant_id: str,
62 chain_seq: int,
63 fact_id: str,
64 event_hash: str,
65 previous_hash: str | None,
66 created_at: str,
67) -> str:
68 """Hash one chain link from its event hash and predecessor."""
69 return _sha256_json(
70 {
71 "chain_seq": chain_seq,
72 "created_at": created_at,
73 "event_hash": event_hash,
74 "fact_id": fact_id,
75 "previous_hash": previous_hash,
76 "tenant_id": tenant_id,
77 }
78 )
81def append_fact_chain_entry(conn: Any, row: Any) -> None:
82 """Append a local L4 chain entry for a newly inserted fact row."""
83 tenant_id = row["tenant_id"]
84 existing = conn.execute(
85 "SELECT 1 FROM fact_chain WHERE tenant_id = ? AND fact_id = ?",
86 (tenant_id, row["id"]),
87 ).fetchone()
88 if existing is not None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return
91 previous = conn.execute(
92 "SELECT chain_seq, chain_hash FROM fact_chain "
93 "WHERE tenant_id = ? ORDER BY chain_seq DESC LIMIT 1",
94 (tenant_id,),
95 ).fetchone()
96 chain_seq = 1 if previous is None else int(previous["chain_seq"]) + 1
97 previous_hash = None if previous is None else previous["chain_hash"]
98 event_hash = compute_fact_chain_event_hash(row)
99 created_at = datetime.now(UTC).isoformat()
100 chain_hash = compute_fact_chain_hash(
101 tenant_id=tenant_id,
102 chain_seq=chain_seq,
103 fact_id=row["id"],
104 event_hash=event_hash,
105 previous_hash=previous_hash,
106 created_at=created_at,
107 )
108 conn.execute(
109 """
110 INSERT INTO fact_chain
111 (id, tenant_id, chain_seq, fact_id, event_hash, previous_hash, chain_hash, created_at)
112 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
113 """,
114 (
115 f"chain_{uuid.uuid4().hex}",
116 tenant_id,
117 chain_seq,
118 row["id"],
119 event_hash,
120 previous_hash,
121 chain_hash,
122 created_at,
123 ),
124 )
125 submit_due_fact_chain_checkpoints(conn, tenant_id=tenant_id)
128def verify_fact_chain(conn: Any, tenant_id: str = "default") -> FactChainVerification:
129 """Verify chain sequence, fact event hashes, and link hashes for one tenant."""
130 rows = conn.execute(
131 """
132 SELECT
133 fc.*,
134 f.id AS stored_fact_id,
135 f.entity,
136 f.relation,
137 f.value_type,
138 f.value_v,
139 f.source,
140 f.confidence,
141 f.scope,
142 f.tenant_id AS fact_tenant_id,
143 f.cid
144 FROM fact_chain fc
145 LEFT JOIN facts f ON f.id = fc.fact_id AND f.tenant_id = fc.tenant_id
146 WHERE fc.tenant_id = ?
147 ORDER BY fc.chain_seq ASC
148 """,
149 (tenant_id,),
150 ).fetchall()
152 previous_hash: str | None = None
153 expected_seq = 1
154 for row in rows:
155 chain_seq = int(row["chain_seq"])
156 if chain_seq != expected_seq: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 return FactChainVerification(
158 valid=False,
159 checked_entries=expected_seq - 1,
160 mismatch_reason="chain_seq_gap",
161 fact_id=row["fact_id"],
162 chain_seq=chain_seq,
163 )
164 if row["previous_hash"] != previous_hash:
165 return FactChainVerification(
166 valid=False,
167 checked_entries=expected_seq - 1,
168 mismatch_reason="previous_hash_mismatch",
169 fact_id=row["fact_id"],
170 chain_seq=chain_seq,
171 )
172 if row["stored_fact_id"] is None: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 return FactChainVerification(
174 valid=False,
175 checked_entries=expected_seq - 1,
176 mismatch_reason="fact_missing",
177 fact_id=row["fact_id"],
178 chain_seq=chain_seq,
179 )
181 fact_row = {
182 "cid": row["cid"],
183 "confidence": row["confidence"],
184 "entity": row["entity"],
185 "id": row["fact_id"],
186 "relation": row["relation"],
187 "scope": row["scope"],
188 "source": row["source"],
189 "tenant_id": row["fact_tenant_id"],
190 "value_type": row["value_type"],
191 "value_v": row["value_v"],
192 }
193 expected_event_hash = compute_fact_chain_event_hash(fact_row)
194 if row["event_hash"] != expected_event_hash: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 return FactChainVerification(
196 valid=False,
197 checked_entries=expected_seq - 1,
198 mismatch_reason="event_hash_mismatch",
199 fact_id=row["fact_id"],
200 chain_seq=chain_seq,
201 )
203 expected_chain_hash = compute_fact_chain_hash(
204 tenant_id=row["tenant_id"],
205 chain_seq=chain_seq,
206 fact_id=row["fact_id"],
207 event_hash=row["event_hash"],
208 previous_hash=row["previous_hash"],
209 created_at=row["created_at"],
210 )
211 if row["chain_hash"] != expected_chain_hash: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 return FactChainVerification(
213 valid=False,
214 checked_entries=expected_seq - 1,
215 mismatch_reason="chain_hash_mismatch",
216 fact_id=row["fact_id"],
217 chain_seq=chain_seq,
218 )
220 previous_hash = row["chain_hash"]
221 expected_seq += 1
223 return FactChainVerification(valid=True, checked_entries=len(rows))
226def build_fact_chain_proof(conn: Any, tenant_id: str = "default") -> dict[str, Any]:
227 """Return a compact proof over the current tenant-local chain head."""
228 verification = verify_fact_chain(conn, tenant_id=tenant_id)
229 if not verification.valid:
230 raise FactChainIntegrityError(verification)
231 head = conn.execute(
232 "SELECT chain_hash FROM fact_chain WHERE tenant_id = ? ORDER BY chain_seq DESC LIMIT 1",
233 (tenant_id,),
234 ).fetchone()
235 return {
236 "tenant_id": tenant_id,
237 "checked_entries": verification.checked_entries,
238 "head_hash": None if head is None else head["chain_hash"],
239 "checkpoint": latest_fact_chain_checkpoint(conn, tenant_id=tenant_id),
240 }
243def _json_dumps(body: dict[str, Any]) -> str:
244 return json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
247def _parse_iso(value: str | None) -> datetime | None:
248 if not value: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 return None
250 parsed = datetime.fromisoformat(value)
251 if parsed.tzinfo is None: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return parsed.replace(tzinfo=UTC)
253 return parsed
256def _row_to_checkpoint(row: Any) -> dict[str, Any]:
257 return {
258 "id": row["id"],
259 "tenant_id": row["tenant_id"],
260 "covered_chain_seq": int(row["covered_chain_seq"]),
261 "chain_hash": row["chain_hash"],
262 "status": row["status"],
263 "attempt_count": int(row["attempt_count"]),
264 "created_at": row["created_at"],
265 "submitted_at": row["submitted_at"],
266 "last_error": row["last_error"],
267 "tl_backend": row["tl_backend"],
268 "tl_log_id": row["tl_log_id"],
269 "tl_leaf_hash": row["tl_leaf_hash"],
270 "tl_log_index": row["tl_log_index"],
271 "tl_integrated_time": row["tl_integrated_time"],
272 "tl_inclusion_proof": json.loads(row["tl_inclusion_proof"] or "{}"),
273 "tl_raw": json.loads(row["tl_raw"] or "{}"),
274 }
277def _checkpoint_payload(row: Any, created_at: str) -> dict[str, Any]:
278 return {
279 "kind": _CHECKPOINT_PAYLOAD_KIND,
280 "version": _CHECKPOINT_PAYLOAD_VERSION,
281 "tenant_id": row["tenant_id"],
282 "covered_chain_seq": int(row["chain_seq"]),
283 "chain_hash": row["chain_hash"],
284 "created_at": created_at,
285 }
288def _latest_checkpoint_row(conn: Any, tenant_id: str) -> Any | None:
289 return conn.execute(
290 """
291 SELECT *
292 FROM fact_chain_checkpoints
293 WHERE tenant_id = ?
294 ORDER BY covered_chain_seq DESC
295 LIMIT 1
296 """,
297 (tenant_id,),
298 ).fetchone()
301def latest_fact_chain_checkpoint(conn: Any, tenant_id: str = "default") -> dict[str, Any] | None:
302 """Return the latest local L5 checkpoint metadata for a tenant."""
303 row = _latest_checkpoint_row(conn, tenant_id)
304 return None if row is None else _row_to_checkpoint(row)
307def _next_pending_checkpoint(conn: Any, tenant_id: str, now: datetime) -> Any | None:
308 return conn.execute(
309 """
310 SELECT *
311 FROM fact_chain_checkpoints
312 WHERE tenant_id = ?
313 AND status = 'pending'
314 AND (next_retry_at IS NULL OR next_retry_at <= ?)
315 ORDER BY covered_chain_seq ASC
316 LIMIT 1
317 """,
318 (tenant_id, now.isoformat()),
319 ).fetchone()
322def _submit_checkpoint_row(conn: Any, row: Any, now: datetime) -> None:
323 from .identity.transparency_log import make_transparency_log
324 from .settings import settings
326 attempt_count = int(row["attempt_count"]) + 1
327 try:
328 entry = make_transparency_log().submit(json.loads(row["payload"]))
329 except Exception as exc:
330 retry_s = max(1, settings.fact_chain_checkpoint_retry_s)
331 next_retry_at = (now + timedelta(seconds=retry_s)).isoformat()
332 conn.execute(
333 """
334 UPDATE fact_chain_checkpoints
335 SET attempt_count = ?,
336 next_retry_at = ?,
337 last_error = ?,
338 status = 'pending'
339 WHERE id = ?
340 """,
341 (attempt_count, next_retry_at, str(exc), row["id"]),
342 )
343 return
345 conn.execute(
346 """
347 UPDATE fact_chain_checkpoints
348 SET status = 'submitted',
349 attempt_count = ?,
350 submitted_at = ?,
351 next_retry_at = NULL,
352 last_error = NULL,
353 tl_log_id = ?,
354 tl_leaf_hash = ?,
355 tl_log_index = ?,
356 tl_integrated_time = ?,
357 tl_inclusion_proof = ?,
358 tl_raw = ?
359 WHERE id = ?
360 """,
361 (
362 attempt_count,
363 now.isoformat(),
364 entry.log_id,
365 entry.leaf_hash,
366 entry.log_index,
367 entry.integrated_time,
368 _json_dumps(entry.inclusion_proof),
369 _json_dumps(entry.raw),
370 row["id"],
371 ),
372 )
375def _checkpoint_due(conn: Any, tenant_id: str, head: Any, now: datetime) -> bool:
376 from .settings import settings
378 latest = _latest_checkpoint_row(conn, tenant_id)
379 covered_seq = 0 if latest is None else int(latest["covered_chain_seq"])
380 head_seq = int(head["chain_seq"])
381 if head_seq <= covered_seq: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 return False
383 checkpoint_interval = max(1, settings.fact_chain_checkpoint_interval)
384 checkpoint_max_age_s = max(1, settings.fact_chain_checkpoint_max_age_s)
385 if head_seq - covered_seq >= checkpoint_interval:
386 return True
388 reference_time = (
389 _parse_iso(head["created_at"]) if latest is None else _parse_iso(latest["created_at"])
390 )
391 if reference_time is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 return False
393 return now - reference_time >= timedelta(seconds=checkpoint_max_age_s)
396def _create_checkpoint_for_head(conn: Any, tenant_id: str, head: Any, now: datetime) -> Any:
397 from .settings import settings
399 created_at = now.isoformat()
400 payload = _checkpoint_payload(head, created_at)
401 checkpoint_id = f"chaincp_{uuid.uuid4().hex}"
402 conn.execute(
403 """
404 INSERT INTO fact_chain_checkpoints
405 (id, tenant_id, covered_chain_seq, chain_hash, payload, created_at, tl_backend)
406 VALUES (?, ?, ?, ?, ?, ?, ?)
407 """,
408 (
409 checkpoint_id,
410 tenant_id,
411 int(head["chain_seq"]),
412 head["chain_hash"],
413 _json_dumps(payload),
414 created_at,
415 settings.tl_backend,
416 ),
417 )
418 return conn.execute(
419 "SELECT * FROM fact_chain_checkpoints WHERE id = ?",
420 (checkpoint_id,),
421 ).fetchone()
424def submit_due_fact_chain_checkpoints(conn: Any, tenant_id: str = "default") -> None:
425 """Create and submit due L5 checkpoints without blocking fact writes."""
426 now = datetime.now(UTC)
427 pending = _next_pending_checkpoint(conn, tenant_id, now)
428 if pending is not None: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 _submit_checkpoint_row(conn, pending, now)
431 head = conn.execute(
432 """
433 SELECT tenant_id, chain_seq, chain_hash, created_at
434 FROM fact_chain
435 WHERE tenant_id = ?
436 ORDER BY chain_seq DESC
437 LIMIT 1
438 """,
439 (tenant_id,),
440 ).fetchone()
441 if head is None or not _checkpoint_due(conn, tenant_id, head, now):
442 return
443 checkpoint = _create_checkpoint_for_head(conn, tenant_id, head, now)
444 _submit_checkpoint_row(conn, checkpoint, now)