Coverage for node / src / stigmem_node / routes / facts / provenance.py: 76%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Fact provenance route and helpers."""
3from __future__ import annotations
5from typing import Annotated, Any
7from fastapi import Depends, HTTPException, status
9from ...auth import Identity, resolve_identity
10from ...db import db
11from ...models.provenance import ProvenanceEntry, ProvenanceResponse
12from .common import _get_tombstone_filter, logger, router
15def _resolve_provenance_entry(entry: Any, tenant_id: str) -> tuple[str, Any] | None:
16 """Resolve a derived_from entry to (hash_val, ref_row | None); skip non-dict entries."""
17 if not isinstance(entry, dict): 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true
18 return None
19 hash_val: str = entry.get("hash", "")
20 entry_fact_id: str | None = entry.get("fact_id")
22 ref_row = None
23 with db() as conn:
24 if entry_fact_id: 24 ↛ 29line 24 didn't jump to line 29 because the condition on line 24 was always true
25 ref_row = conn.execute(
26 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?",
27 (entry_fact_id, tenant_id),
28 ).fetchone()
29 elif hash_val.startswith("sha256:"):
30 alias = conn.execute(
31 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?",
32 (hash_val,),
33 ).fetchone()
34 if alias:
35 ref_row = conn.execute(
36 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?",
37 (alias["fact_id"], tenant_id),
38 ).fetchone()
39 return hash_val, ref_row
42def _format_provenance_entry(hash_val: str, ref_row: Any, excluded: set[str]) -> ProvenanceEntry:
43 """Render a resolved entry into a ProvenanceEntry, redacting tombstoned/missing rows."""
44 if ref_row is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 return ProvenanceEntry(hash=hash_val, exists=False)
46 if ref_row["entity"] in excluded:
47 return ProvenanceEntry(hash=hash_val, exists=False)
48 return ProvenanceEntry(
49 hash=hash_val,
50 fact_id=ref_row["id"],
51 entity=ref_row["entity"],
52 exists=True,
53 )
56@router.get("/{fact_id}/provenance", response_model=ProvenanceResponse)
57def get_provenance(
58 fact_id: str,
59 identity: Annotated[Identity, Depends(resolve_identity)],
60) -> ProvenanceResponse:
61 """Provenance walk with tombstone suppression.
63 Returns the derived_from chain for a fact. Any entry whose referenced entity is
64 tombstoned — or whose fact is otherwise inaccessible — is redacted to
65 {"hash": "...", "exists": false}, indistinguishable from unauthorized
66 cross-scope references to prevent existence leakage. Covered by
67 Spec-X2-RTBF-Tombstones and Spec-X11-Recall-Graph.
68 """
69 import json as _prov_json
71 if not identity.can_read(): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 raise HTTPException(
73 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
74 )
76 with db() as conn:
77 row = conn.execute(
78 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?",
79 (fact_id, identity.tenant_id),
80 ).fetchone()
81 if row is None:
82 raise HTTPException(status_code=404, detail="fact not found")
84 derived_from_raw = row["derived_from"] if "derived_from" in row.keys() else None # noqa: SIM118
85 cid_val = row["cid"] if "cid" in row.keys() else None # noqa: SIM118
86 root_scope: str = row["scope"] or "local"
88 if not derived_from_raw:
89 return ProvenanceResponse(fact_id=fact_id, cid=cid_val, derived_from=[])
91 try:
92 entries_raw: list[Any] = _prov_json.loads(derived_from_raw)
93 except Exception as exc:
94 logger.warning("ignoring malformed provenance for fact %s: %s", fact_id, exc)
95 entries_raw = []
97 # Resolve each derived_from entry to its referenced fact row
98 resolved: list[tuple[str, Any]] = [] # (hash_val, ref_row | None)
99 for entry in entries_raw:
100 resolved_entry = _resolve_provenance_entry(entry, identity.tenant_id)
101 if resolved_entry is not None: 101 ↛ 99line 101 didn't jump to line 99 because the condition on line 101 was always true
102 resolved.append(resolved_entry)
104 # Single tombstone filter call across all resolved entity URIs (§23.3.2 r.4)
105 accessible_entities = [ref_row["entity"] for _, ref_row in resolved if ref_row is not None]
106 excluded: set[str] = set()
107 if accessible_entities: 107 ↛ 113line 107 didn't jump to line 113 because the condition on line 107 was always true
108 with db() as _tc_conn:
109 is_admin = identity.is_admin()
110 excluded, _ = _get_tombstone_filter(_tc_conn, accessible_entities, root_scope, is_admin)
112 # Build response — §23.3.2 r.4 tombstone and §20.6.2 unauthorized share identical shape
113 result: list[ProvenanceEntry] = [
114 _format_provenance_entry(hash_val, ref_row, excluded) for hash_val, ref_row in resolved
115 ]
117 return ProvenanceResponse(fact_id=fact_id, cid=cid_val, derived_from=result)