Coverage for node / src / stigmem_node / routes / facts / provenance.py: 76%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Fact provenance route and helpers.""" 

2 

3from __future__ import annotations 

4 

5from typing import Annotated, Any 

6 

7from fastapi import Depends, HTTPException, status 

8 

9from ...auth import Identity, resolve_identity 

10from ...db import db 

11from ...models.provenance import ProvenanceEntry, ProvenanceResponse 

12from .common import _get_tombstone_filter, logger, router 

13 

14 

15def _resolve_provenance_entry(entry: Any, tenant_id: str) -> tuple[str, Any] | None: 

16 """Resolve a derived_from entry to (hash_val, ref_row | None); skip non-dict entries.""" 

17 if not isinstance(entry, dict): 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 return None 

19 hash_val: str = entry.get("hash", "") 

20 entry_fact_id: str | None = entry.get("fact_id") 

21 

22 ref_row = None 

23 with db() as conn: 

24 if entry_fact_id: 24 ↛ 29line 24 didn't jump to line 29 because the condition on line 24 was always true

25 ref_row = conn.execute( 

26 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?", 

27 (entry_fact_id, tenant_id), 

28 ).fetchone() 

29 elif hash_val.startswith("sha256:"): 

30 alias = conn.execute( 

31 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", 

32 (hash_val,), 

33 ).fetchone() 

34 if alias: 

35 ref_row = conn.execute( 

36 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?", 

37 (alias["fact_id"], tenant_id), 

38 ).fetchone() 

39 return hash_val, ref_row 

40 

41 

42def _format_provenance_entry(hash_val: str, ref_row: Any, excluded: set[str]) -> ProvenanceEntry: 

43 """Render a resolved entry into a ProvenanceEntry, redacting tombstoned/missing rows.""" 

44 if ref_row is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 return ProvenanceEntry(hash=hash_val, exists=False) 

46 if ref_row["entity"] in excluded: 

47 return ProvenanceEntry(hash=hash_val, exists=False) 

48 return ProvenanceEntry( 

49 hash=hash_val, 

50 fact_id=ref_row["id"], 

51 entity=ref_row["entity"], 

52 exists=True, 

53 ) 

54 

55 

56@router.get("/{fact_id}/provenance", response_model=ProvenanceResponse) 

57def get_provenance( 

58 fact_id: str, 

59 identity: Annotated[Identity, Depends(resolve_identity)], 

60) -> ProvenanceResponse: 

61 """Provenance walk with tombstone suppression. 

62 

63 Returns the derived_from chain for a fact. Any entry whose referenced entity is 

64 tombstoned — or whose fact is otherwise inaccessible — is redacted to 

65 {"hash": "...", "exists": false}, indistinguishable from unauthorized 

66 cross-scope references to prevent existence leakage. Covered by 

67 Spec-X2-RTBF-Tombstones and Spec-X11-Recall-Graph. 

68 """ 

69 import json as _prov_json 

70 

71 if not identity.can_read(): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 raise HTTPException( 

73 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

74 ) 

75 

76 with db() as conn: 

77 row = conn.execute( 

78 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?", 

79 (fact_id, identity.tenant_id), 

80 ).fetchone() 

81 if row is None: 

82 raise HTTPException(status_code=404, detail="fact not found") 

83 

84 derived_from_raw = row["derived_from"] if "derived_from" in row.keys() else None # noqa: SIM118 

85 cid_val = row["cid"] if "cid" in row.keys() else None # noqa: SIM118 

86 root_scope: str = row["scope"] or "local" 

87 

88 if not derived_from_raw: 

89 return ProvenanceResponse(fact_id=fact_id, cid=cid_val, derived_from=[]) 

90 

91 try: 

92 entries_raw: list[Any] = _prov_json.loads(derived_from_raw) 

93 except Exception as exc: 

94 logger.warning("ignoring malformed provenance for fact %s: %s", fact_id, exc) 

95 entries_raw = [] 

96 

97 # Resolve each derived_from entry to its referenced fact row 

98 resolved: list[tuple[str, Any]] = [] # (hash_val, ref_row | None) 

99 for entry in entries_raw: 

100 resolved_entry = _resolve_provenance_entry(entry, identity.tenant_id) 

101 if resolved_entry is not None: 101 ↛ 99line 101 didn't jump to line 99 because the condition on line 101 was always true

102 resolved.append(resolved_entry) 

103 

104 # Single tombstone filter call across all resolved entity URIs (§23.3.2 r.4) 

105 accessible_entities = [ref_row["entity"] for _, ref_row in resolved if ref_row is not None] 

106 excluded: set[str] = set() 

107 if accessible_entities: 107 ↛ 113line 107 didn't jump to line 113 because the condition on line 107 was always true

108 with db() as _tc_conn: 

109 is_admin = identity.is_admin() 

110 excluded, _ = _get_tombstone_filter(_tc_conn, accessible_entities, root_scope, is_admin) 

111 

112 # Build response — §23.3.2 r.4 tombstone and §20.6.2 unauthorized share identical shape 

113 result: list[ProvenanceEntry] = [ 

114 _format_provenance_entry(hash_val, ref_row, excluded) for hash_val, ref_row in resolved 

115 ] 

116 

117 return ProvenanceResponse(fact_id=fact_id, cid=cid_val, derived_from=result)