Coverage for node / src / stigmem_node / source_trust.py: 79%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Source-trust score computation — spec §19.4. 

2 

3Exported surface: 

4 compute_source_trust(source_uri, scope, identity) -> float 

5 get_cached_trust(source_uri) -> float | None 

6 bust_trust_cache(source_uri) -> None 

7 is_blocklisted(source_uri) -> bool 

8 

9The score is in [0.0, 1.0]. A source with no computable score defaults to 0.5. 

10Admin-blocklisted sources always return 0.0. 

11 

12Per §19.4.4, implementations SHOULD cache per-source trust scores with a TTL of 

13at least 60 seconds. We use a simple time-based in-process cache; each worker 

14process maintains its own cache (acceptable for single-process deployments). 

15""" 

16 

17from __future__ import annotations 

18 

19import logging 

20import os 

21import time 

22from functools import lru_cache 

23from typing import TYPE_CHECKING 

24 

25if TYPE_CHECKING: 

26 from .auth import Identity 

27 

28logger = logging.getLogger("stigmem.source_trust") 

29 

30# Cache: { source_uri: (score: float, expires_at: float) } 

31_TRUST_CACHE: dict[str, tuple[float, float]] = {} 

32_CACHE_TTL_S: float = 60.0 

33 

34# Cached cutoff for peer-history lookups. Refreshing this once per cache window 

35# keeps the rolling 30-day window accurate enough for source-trust scoring 

36# without recalculating the timestamp on every fresh trust computation. 

37_PEER_HISTORY_CUTOFF_REFRESH_S: float = 60.0 

38 

39 

40@lru_cache(maxsize=2) 

41def _peer_history_cutoff_iso_for_bucket(_bucket: int) -> str: 

42 from datetime import UTC, datetime, timedelta 

43 

44 return (datetime.now(UTC) - timedelta(days=30)).isoformat() 

45 

46 

47def _peer_history_cutoff_iso() -> str: 

48 refresh_bucket = int(time.monotonic() // _PEER_HISTORY_CUTOFF_REFRESH_S) 

49 return _peer_history_cutoff_iso_for_bucket(refresh_bucket) 

50 

51 

52def get_cached_trust(source_uri: str) -> float | None: 

53 entry = _TRUST_CACHE.get(source_uri) 

54 if entry is None: 

55 return None 

56 score, expires_at = entry 

57 if time.monotonic() > expires_at: 

58 del _TRUST_CACHE[source_uri] 

59 return None 

60 return score 

61 

62 

63def _set_cache(source_uri: str, score: float) -> None: 

64 _TRUST_CACHE[source_uri] = (score, time.monotonic() + _CACHE_TTL_S) 

65 

66 

67def bust_trust_cache(source_uri: str) -> None: 

68 _TRUST_CACHE.pop(source_uri, None) 

69 

70 

71def compute_source_trust( 

72 source_uri: str, 

73 scope: str, 

74 identity: Identity | None = None, 

75 *, 

76 identity_strength_override: float | None = None, 

77) -> float: 

78 """Return the source-trust score for *source_uri* asserting a fact at *scope*. 

79 

80 Uses cached value if available (TTL 60 s). Caches the freshly computed value. 

81 When *identity_strength_override* is set the cache is bypassed so the 

82 override is always honoured (spec §19.4.2 capability-token boost). 

83 """ 

84 if identity_strength_override is None: 

85 cached = get_cached_trust(source_uri) 

86 if cached is not None: 

87 return cached 

88 

89 score = _compute_fresh( 

90 source_uri, scope, identity, identity_strength_override=identity_strength_override 

91 ) 

92 if identity_strength_override is None: 

93 _set_cache(source_uri, score) 

94 return score 

95 

96 

97def _compute_fresh( 

98 source_uri: str, 

99 scope: str, 

100 identity: Identity | None, 

101 *, 

102 identity_strength_override: float | None = None, 

103) -> float: 

104 from .settings import settings 

105 

106 trust_mode = settings.trust_mode 

107 if trust_mode == "off": 

108 return 0.5 # not computed; return neutral default 

109 

110 # Check blocklist first — always returns 0.0 regardless (§19.4.5) 

111 if is_blocklisted(source_uri): 

112 return 0.0 

113 

114 # Check auto-rules (operator always_trust / never_trust) 

115 from .trust_rules import evaluate_auto_rules 

116 

117 override = evaluate_auto_rules(source_uri, scope) 

118 if override is not None: 

119 return override 

120 

121 w_i = settings.trust_weight_identity 

122 w_p = settings.trust_weight_peer_history 

123 w_s = settings.trust_weight_scope_authority 

124 w_a = settings.trust_weight_attestation_mode 

125 

126 i_s = ( 

127 identity_strength_override 

128 if identity_strength_override is not None 

129 else _identity_strength(source_uri) 

130 ) 

131 p_h = _peer_history(source_uri) 

132 s_a = _scope_authority(source_uri, scope, identity) 

133 a_m = _attestation_mode_factor() 

134 

135 raw = w_i * i_s + w_p * p_h + w_s * s_a + w_a * a_m 

136 return max(0.0, min(1.0, raw)) 

137 

138 

139# --------------------------------------------------------------------------- 

140# Component functions (§19.4.2) 

141# --------------------------------------------------------------------------- 

142 

143 

144def _identity_strength(source_uri: str) -> float: 

145 """Score [0,1] measuring how strongly the source is identified.""" 

146 if not source_uri: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 return 0.0 

148 

149 from .db import db 

150 

151 with db() as conn: 

152 # Source has a valid org manifest (§19.1)? 

153 manifest_row = conn.execute( 

154 "SELECT log_entry_json FROM federation_manifests WHERE entity_uri = ?", 

155 (source_uri,), 

156 ).fetchone() 

157 if manifest_row is not None: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 has_log_proof = manifest_row["log_entry_json"] is not None 

159 return 1.0 if has_log_proof else 0.7 

160 

161 # Source is a registered local API key? 

162 key_row = conn.execute( 

163 "SELECT id FROM agent_keys WHERE entity_uri = ? AND status = 'active'", 

164 (source_uri,), 

165 ).fetchone() 

166 if key_row is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 return 0.4 

168 

169 # Syntactically valid URI (has a scheme)? 

170 if "://" in source_uri or source_uri.startswith("stigmem:"): 

171 return 0.1 

172 

173 return 0.0 

174 

175 

176def _peer_history(source_uri: str) -> float: 

177 """Score [0,1] derived from interaction history over the past 30 days.""" 

178 from .db import db 

179 

180 cutoff = _peer_history_cutoff_iso() 

181 

182 with db() as conn: 

183 row = conn.execute( 

184 """SELECT 

185 COUNT(*) AS total, 

186 SUM(CASE WHEN attested = 0 THEN 1 ELSE 0 END) AS failures 

187 FROM facts 

188 WHERE source = ? AND timestamp >= ?""", 

189 (source_uri, cutoff), 

190 ).fetchone() 

191 

192 if row is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 return 0.5 # no history → neutral (§19.4.2) 

194 

195 total = row["total"] or 0 

196 failures = row["failures"] or 0 

197 

198 if total < 10: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true

199 return 0.5 # new source 

200 failure_rate = failures / total 

201 if total >= 100 and failure_rate == 0.0: 

202 return 1.0 

203 if failure_rate >= 0.05: 

204 return 0.3 

205 return 0.7 # ≥ 10 facts, < 5% failures 

206 

207 

208def _scope_authority(source_uri: str, scope: str, identity: Identity | None) -> float: 

209 """Score [0,1] measuring whether source has authority to write at this scope.""" 

210 if identity is not None and identity.entity_uri == source_uri and identity.can_write(): 

211 return 0.9 

212 

213 from .settings import settings 

214 

215 # Source entity prefix matches node authority 

216 try: 

217 from urllib.parse import urlparse 

218 

219 parsed = urlparse(settings.node_url) 

220 authority = parsed.netloc or parsed.path 

221 if source_uri.startswith(f"stigmem://{authority}"): 

222 return 0.7 

223 except Exception: 

224 logger.exception("Failed to parse node_url while scoring source authority") 

225 

226 # External entity without explicit scope authority 

227 if scope in ("public", "team"): 

228 return 0.5 # treat as external with federate-level access 

229 

230 return 0.2 

231 

232 

233def _attestation_mode_factor() -> float: 

234 """Score based on source-attestation mode only when the plugin is enabled.""" 

235 from .settings import settings 

236 

237 if not _source_attestation_plugin_enabled(): 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true

238 return 0.2 

239 

240 mode = settings.source_attestation_mode 

241 if mode == "enforce": 

242 return 1.0 

243 if mode == "warn": 

244 return 0.6 

245 return 0.2 # "off" 

246 

247 

248def _source_attestation_plugin_enabled() -> bool: 

249 from .plugins import get_registry 

250 

251 if "stigmem-plugin-source-attestation" not in get_registry().registered_plugins(): 251 ↛ 253line 251 didn't jump to line 253 because the condition on line 251 was always true

252 return False 

253 raw = os.environ.get("STIGMEM_SOURCE_ATTESTATION_ENABLED", "") 

254 return raw.strip().lower() in {"1", "true", "yes", "on"} 

255 

256 

257def is_blocklisted(source_uri: str) -> bool: 

258 """Return True if source_uri has been admin-blocklisted (peer_history = 0.0 rule).""" 

259 from .db import db 

260 

261 with db() as conn: 

262 row = conn.execute( 

263 "SELECT id FROM quarantine_rules WHERE rule_type = 'never_trust' AND org_uri = ?", 

264 (source_uri,), 

265 ).fetchone() 

266 return row is not None