Coverage for node / src / stigmem_node / source_trust.py: 79%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Source-trust score computation — spec §19.4.
3Exported surface:
4 compute_source_trust(source_uri, scope, identity) -> float
5 get_cached_trust(source_uri) -> float | None
6 bust_trust_cache(source_uri) -> None
7 is_blocklisted(source_uri) -> bool
9The score is in [0.0, 1.0]. A source with no computable score defaults to 0.5.
10Admin-blocklisted sources always return 0.0.
12Per §19.4.4, implementations SHOULD cache per-source trust scores with a TTL of
13at least 60 seconds. We use a simple time-based in-process cache; each worker
14process maintains its own cache (acceptable for single-process deployments).
15"""
17from __future__ import annotations
19import logging
20import os
21import time
22from functools import lru_cache
23from typing import TYPE_CHECKING
25if TYPE_CHECKING:
26 from .auth import Identity
28logger = logging.getLogger("stigmem.source_trust")
30# Cache: { source_uri: (score: float, expires_at: float) }
31_TRUST_CACHE: dict[str, tuple[float, float]] = {}
32_CACHE_TTL_S: float = 60.0
34# Cached cutoff for peer-history lookups. Refreshing this once per cache window
35# keeps the rolling 30-day window accurate enough for source-trust scoring
36# without recalculating the timestamp on every fresh trust computation.
37_PEER_HISTORY_CUTOFF_REFRESH_S: float = 60.0
40@lru_cache(maxsize=2)
41def _peer_history_cutoff_iso_for_bucket(_bucket: int) -> str:
42 from datetime import UTC, datetime, timedelta
44 return (datetime.now(UTC) - timedelta(days=30)).isoformat()
47def _peer_history_cutoff_iso() -> str:
48 refresh_bucket = int(time.monotonic() // _PEER_HISTORY_CUTOFF_REFRESH_S)
49 return _peer_history_cutoff_iso_for_bucket(refresh_bucket)
52def get_cached_trust(source_uri: str) -> float | None:
53 entry = _TRUST_CACHE.get(source_uri)
54 if entry is None:
55 return None
56 score, expires_at = entry
57 if time.monotonic() > expires_at:
58 del _TRUST_CACHE[source_uri]
59 return None
60 return score
63def _set_cache(source_uri: str, score: float) -> None:
64 _TRUST_CACHE[source_uri] = (score, time.monotonic() + _CACHE_TTL_S)
67def bust_trust_cache(source_uri: str) -> None:
68 _TRUST_CACHE.pop(source_uri, None)
71def compute_source_trust(
72 source_uri: str,
73 scope: str,
74 identity: Identity | None = None,
75 *,
76 identity_strength_override: float | None = None,
77) -> float:
78 """Return the source-trust score for *source_uri* asserting a fact at *scope*.
80 Uses cached value if available (TTL 60 s). Caches the freshly computed value.
81 When *identity_strength_override* is set the cache is bypassed so the
82 override is always honoured (spec §19.4.2 capability-token boost).
83 """
84 if identity_strength_override is None:
85 cached = get_cached_trust(source_uri)
86 if cached is not None:
87 return cached
89 score = _compute_fresh(
90 source_uri, scope, identity, identity_strength_override=identity_strength_override
91 )
92 if identity_strength_override is None:
93 _set_cache(source_uri, score)
94 return score
97def _compute_fresh(
98 source_uri: str,
99 scope: str,
100 identity: Identity | None,
101 *,
102 identity_strength_override: float | None = None,
103) -> float:
104 from .settings import settings
106 trust_mode = settings.trust_mode
107 if trust_mode == "off":
108 return 0.5 # not computed; return neutral default
110 # Check blocklist first — always returns 0.0 regardless (§19.4.5)
111 if is_blocklisted(source_uri):
112 return 0.0
114 # Check auto-rules (operator always_trust / never_trust)
115 from .trust_rules import evaluate_auto_rules
117 override = evaluate_auto_rules(source_uri, scope)
118 if override is not None:
119 return override
121 w_i = settings.trust_weight_identity
122 w_p = settings.trust_weight_peer_history
123 w_s = settings.trust_weight_scope_authority
124 w_a = settings.trust_weight_attestation_mode
126 i_s = (
127 identity_strength_override
128 if identity_strength_override is not None
129 else _identity_strength(source_uri)
130 )
131 p_h = _peer_history(source_uri)
132 s_a = _scope_authority(source_uri, scope, identity)
133 a_m = _attestation_mode_factor()
135 raw = w_i * i_s + w_p * p_h + w_s * s_a + w_a * a_m
136 return max(0.0, min(1.0, raw))
139# ---------------------------------------------------------------------------
140# Component functions (§19.4.2)
141# ---------------------------------------------------------------------------
144def _identity_strength(source_uri: str) -> float:
145 """Score [0,1] measuring how strongly the source is identified."""
146 if not source_uri: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 return 0.0
149 from .db import db
151 with db() as conn:
152 # Source has a valid org manifest (§19.1)?
153 manifest_row = conn.execute(
154 "SELECT log_entry_json FROM federation_manifests WHERE entity_uri = ?",
155 (source_uri,),
156 ).fetchone()
157 if manifest_row is not None: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 has_log_proof = manifest_row["log_entry_json"] is not None
159 return 1.0 if has_log_proof else 0.7
161 # Source is a registered local API key?
162 key_row = conn.execute(
163 "SELECT id FROM agent_keys WHERE entity_uri = ? AND status = 'active'",
164 (source_uri,),
165 ).fetchone()
166 if key_row is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 return 0.4
169 # Syntactically valid URI (has a scheme)?
170 if "://" in source_uri or source_uri.startswith("stigmem:"):
171 return 0.1
173 return 0.0
176def _peer_history(source_uri: str) -> float:
177 """Score [0,1] derived from interaction history over the past 30 days."""
178 from .db import db
180 cutoff = _peer_history_cutoff_iso()
182 with db() as conn:
183 row = conn.execute(
184 """SELECT
185 COUNT(*) AS total,
186 SUM(CASE WHEN attested = 0 THEN 1 ELSE 0 END) AS failures
187 FROM facts
188 WHERE source = ? AND timestamp >= ?""",
189 (source_uri, cutoff),
190 ).fetchone()
192 if row is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 return 0.5 # no history → neutral (§19.4.2)
195 total = row["total"] or 0
196 failures = row["failures"] or 0
198 if total < 10: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true
199 return 0.5 # new source
200 failure_rate = failures / total
201 if total >= 100 and failure_rate == 0.0:
202 return 1.0
203 if failure_rate >= 0.05:
204 return 0.3
205 return 0.7 # ≥ 10 facts, < 5% failures
208def _scope_authority(source_uri: str, scope: str, identity: Identity | None) -> float:
209 """Score [0,1] measuring whether source has authority to write at this scope."""
210 if identity is not None and identity.entity_uri == source_uri and identity.can_write():
211 return 0.9
213 from .settings import settings
215 # Source entity prefix matches node authority
216 try:
217 from urllib.parse import urlparse
219 parsed = urlparse(settings.node_url)
220 authority = parsed.netloc or parsed.path
221 if source_uri.startswith(f"stigmem://{authority}"):
222 return 0.7
223 except Exception:
224 logger.exception("Failed to parse node_url while scoring source authority")
226 # External entity without explicit scope authority
227 if scope in ("public", "team"):
228 return 0.5 # treat as external with federate-level access
230 return 0.2
233def _attestation_mode_factor() -> float:
234 """Score based on source-attestation mode only when the plugin is enabled."""
235 from .settings import settings
237 if not _source_attestation_plugin_enabled(): 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was always true
238 return 0.2
240 mode = settings.source_attestation_mode
241 if mode == "enforce":
242 return 1.0
243 if mode == "warn":
244 return 0.6
245 return 0.2 # "off"
248def _source_attestation_plugin_enabled() -> bool:
249 from .plugins import get_registry
251 if "stigmem-plugin-source-attestation" not in get_registry().registered_plugins(): 251 ↛ 253line 251 didn't jump to line 253 because the condition on line 251 was always true
252 return False
253 raw = os.environ.get("STIGMEM_SOURCE_ATTESTATION_ENABLED", "")
254 return raw.strip().lower() in {"1", "true", "yes", "on"}
257def is_blocklisted(source_uri: str) -> bool:
258 """Return True if source_uri has been admin-blocklisted (peer_history = 0.0 rule)."""
259 from .db import db
261 with db() as conn:
262 row = conn.execute(
263 "SELECT id FROM quarantine_rules WHERE rule_type = 'never_trust' AND org_uri = ?",
264 (source_uri,),
265 ).fetchone()
266 return row is not None