Coverage for node / src / stigmem_node / session_graph.py: 83%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Per-session read/write graph controls for R-21 feedback-loop defense."""
3from __future__ import annotations
5import json
6import uuid
7from datetime import UTC, datetime
8from typing import Any
10from fastapi import HTTPException, status
12from .auth import Identity
14SESSION_HEADER = "Stigmem-Session"
15SUMMARIZE_WITH_PROVENANCE = "summarize_with_provenance"
18def normalize_session_id(session_id: str | None) -> str | None:
19 """Return a bounded session id, or None when the caller did not opt in."""
20 if session_id is None:
21 return None
22 normalized = session_id.strip()
23 if not normalized: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true
24 return None
25 if len(normalized) > 128: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true
26 raise HTTPException(
27 status_code=status.HTTP_400_BAD_REQUEST,
28 detail="session_id_too_long",
29 )
30 return normalized
33def _now_iso() -> str:
34 return datetime.now(UTC).isoformat()
37def record_read_scopes(
38 conn: Any,
39 *,
40 identity: Identity,
41 session_id: str | None,
42 scopes: set[str],
43) -> None:
44 """Record scopes read by a caller in a session."""
45 normalized = normalize_session_id(session_id)
46 if normalized is None:
47 return
48 now = _now_iso()
49 for scope in scopes:
50 conn.execute(
51 """INSERT OR IGNORE INTO session_scope_access
52 (id, session_id, entity_uri, tenant_id, scope, access_type, ts)
53 VALUES (?, ?, ?, ?, ?, 'read', ?)""",
54 (
55 str(uuid.uuid4()),
56 normalized,
57 identity.entity_uri,
58 identity.tenant_id,
59 scope,
60 now,
61 ),
62 )
65def record_write_scope(
66 conn: Any,
67 *,
68 identity: Identity,
69 session_id: str | None,
70 scope: str,
71) -> None:
72 """Record a scope written by a caller in a session."""
73 normalized = normalize_session_id(session_id)
74 if normalized is None:
75 return
76 conn.execute(
77 """INSERT OR IGNORE INTO session_scope_access
78 (id, session_id, entity_uri, tenant_id, scope, access_type, ts)
79 VALUES (?, ?, ?, ?, ?, 'write', ?)""",
80 (
81 str(uuid.uuid4()),
82 normalized,
83 identity.entity_uri,
84 identity.tenant_id,
85 scope,
86 _now_iso(),
87 ),
88 )
91def _read_scopes_for_session(conn: Any, *, identity: Identity, session_id: str) -> set[str]:
92 rows = conn.execute(
93 """SELECT scope FROM session_scope_access
94 WHERE session_id = ?
95 AND entity_uri = ?
96 AND tenant_id = ?
97 AND access_type = 'read'""",
98 (session_id, identity.entity_uri, identity.tenant_id),
99 ).fetchall()
100 return {row["scope"] for row in rows}
103def _provenance_scopes(conn: Any, derived_from: list[dict[str, Any]]) -> set[str]:
104 scopes: set[str] = set()
105 for entry in derived_from:
106 fact_id = entry.get("fact_id")
107 hash_val = entry.get("hash")
108 row = None
109 if fact_id: 109 ↛ 111line 109 didn't jump to line 111 because the condition on line 109 was always true
110 row = conn.execute("SELECT scope FROM facts WHERE id = ?", (fact_id,)).fetchone()
111 elif isinstance(hash_val, str) and hash_val.startswith("sha256:"):
112 alias = conn.execute(
113 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?",
114 (hash_val,),
115 ).fetchone()
116 if alias is not None:
117 row = conn.execute(
118 "SELECT scope FROM facts WHERE id = ?",
119 (alias["fact_id"],),
120 ).fetchone()
121 if row is not None: 121 ↛ 105line 121 didn't jump to line 105 because the condition on line 121 was always true
122 scopes.add(row["scope"])
123 return scopes
126def ensure_write_allowed(
127 conn: Any,
128 *,
129 identity: Identity,
130 session_id: str | None,
131 target_scope: str,
132 write_mode: str,
133 derived_from: list[dict[str, Any]],
134) -> None:
135 """Reject read-then-write same-scope loops unless provenance is carried forward."""
136 normalized = normalize_session_id(session_id)
137 if normalized is None:
138 return
140 read_scopes = _read_scopes_for_session(conn, identity=identity, session_id=normalized)
141 if target_scope not in read_scopes: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 return
144 if write_mode == SUMMARIZE_WITH_PROVENANCE and target_scope in _provenance_scopes(
145 conn, derived_from
146 ):
147 return
149 raise HTTPException(
150 status_code=status.HTTP_403_FORBIDDEN,
151 detail={
152 "code": "feedback_loop_provenance_required",
153 "message": (
154 "writes into scopes read earlier in the same session require "
155 "write_mode='summarize_with_provenance' and source provenance"
156 ),
157 "session_id": normalized,
158 "scope": target_scope,
159 },
160 )
163def encode_derived_from(derived_from: list[dict[str, Any]]) -> str | None:
164 if not derived_from:
165 return None
166 return json.dumps(derived_from, sort_keys=True, separators=(",", ":"))