Coverage for node / src / stigmem_node / session_graph.py: 83%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Per-session read/write graph controls for R-21 feedback-loop defense.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import UTC, datetime 

8from typing import Any 

9 

10from fastapi import HTTPException, status 

11 

12from .auth import Identity 

13 

14SESSION_HEADER = "Stigmem-Session" 

15SUMMARIZE_WITH_PROVENANCE = "summarize_with_provenance" 

16 

17 

18def normalize_session_id(session_id: str | None) -> str | None: 

19 """Return a bounded session id, or None when the caller did not opt in.""" 

20 if session_id is None: 

21 return None 

22 normalized = session_id.strip() 

23 if not normalized: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true

24 return None 

25 if len(normalized) > 128: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true

26 raise HTTPException( 

27 status_code=status.HTTP_400_BAD_REQUEST, 

28 detail="session_id_too_long", 

29 ) 

30 return normalized 

31 

32 

33def _now_iso() -> str: 

34 return datetime.now(UTC).isoformat() 

35 

36 

37def record_read_scopes( 

38 conn: Any, 

39 *, 

40 identity: Identity, 

41 session_id: str | None, 

42 scopes: set[str], 

43) -> None: 

44 """Record scopes read by a caller in a session.""" 

45 normalized = normalize_session_id(session_id) 

46 if normalized is None: 

47 return 

48 now = _now_iso() 

49 for scope in scopes: 

50 conn.execute( 

51 """INSERT OR IGNORE INTO session_scope_access 

52 (id, session_id, entity_uri, tenant_id, scope, access_type, ts) 

53 VALUES (?, ?, ?, ?, ?, 'read', ?)""", 

54 ( 

55 str(uuid.uuid4()), 

56 normalized, 

57 identity.entity_uri, 

58 identity.tenant_id, 

59 scope, 

60 now, 

61 ), 

62 ) 

63 

64 

65def record_write_scope( 

66 conn: Any, 

67 *, 

68 identity: Identity, 

69 session_id: str | None, 

70 scope: str, 

71) -> None: 

72 """Record a scope written by a caller in a session.""" 

73 normalized = normalize_session_id(session_id) 

74 if normalized is None: 

75 return 

76 conn.execute( 

77 """INSERT OR IGNORE INTO session_scope_access 

78 (id, session_id, entity_uri, tenant_id, scope, access_type, ts) 

79 VALUES (?, ?, ?, ?, ?, 'write', ?)""", 

80 ( 

81 str(uuid.uuid4()), 

82 normalized, 

83 identity.entity_uri, 

84 identity.tenant_id, 

85 scope, 

86 _now_iso(), 

87 ), 

88 ) 

89 

90 

91def _read_scopes_for_session(conn: Any, *, identity: Identity, session_id: str) -> set[str]: 

92 rows = conn.execute( 

93 """SELECT scope FROM session_scope_access 

94 WHERE session_id = ? 

95 AND entity_uri = ? 

96 AND tenant_id = ? 

97 AND access_type = 'read'""", 

98 (session_id, identity.entity_uri, identity.tenant_id), 

99 ).fetchall() 

100 return {row["scope"] for row in rows} 

101 

102 

103def _provenance_scopes(conn: Any, derived_from: list[dict[str, Any]]) -> set[str]: 

104 scopes: set[str] = set() 

105 for entry in derived_from: 

106 fact_id = entry.get("fact_id") 

107 hash_val = entry.get("hash") 

108 row = None 

109 if fact_id: 109 ↛ 111line 109 didn't jump to line 111 because the condition on line 109 was always true

110 row = conn.execute("SELECT scope FROM facts WHERE id = ?", (fact_id,)).fetchone() 

111 elif isinstance(hash_val, str) and hash_val.startswith("sha256:"): 

112 alias = conn.execute( 

113 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", 

114 (hash_val,), 

115 ).fetchone() 

116 if alias is not None: 

117 row = conn.execute( 

118 "SELECT scope FROM facts WHERE id = ?", 

119 (alias["fact_id"],), 

120 ).fetchone() 

121 if row is not None: 121 ↛ 105line 121 didn't jump to line 105 because the condition on line 121 was always true

122 scopes.add(row["scope"]) 

123 return scopes 

124 

125 

126def ensure_write_allowed( 

127 conn: Any, 

128 *, 

129 identity: Identity, 

130 session_id: str | None, 

131 target_scope: str, 

132 write_mode: str, 

133 derived_from: list[dict[str, Any]], 

134) -> None: 

135 """Reject read-then-write same-scope loops unless provenance is carried forward.""" 

136 normalized = normalize_session_id(session_id) 

137 if normalized is None: 

138 return 

139 

140 read_scopes = _read_scopes_for_session(conn, identity=identity, session_id=normalized) 

141 if target_scope not in read_scopes: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 return 

143 

144 if write_mode == SUMMARIZE_WITH_PROVENANCE and target_scope in _provenance_scopes( 

145 conn, derived_from 

146 ): 

147 return 

148 

149 raise HTTPException( 

150 status_code=status.HTTP_403_FORBIDDEN, 

151 detail={ 

152 "code": "feedback_loop_provenance_required", 

153 "message": ( 

154 "writes into scopes read earlier in the same session require " 

155 "write_mode='summarize_with_provenance' and source provenance" 

156 ), 

157 "session_id": normalized, 

158 "scope": target_scope, 

159 }, 

160 ) 

161 

162 

163def encode_derived_from(derived_from: list[dict[str, Any]]) -> str | None: 

164 if not derived_from: 

165 return None 

166 return json.dumps(derived_from, sort_keys=True, separators=(",", ":"))