Coverage for node / src / stigmem_node / recall / recall_pipeline.py: 80%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Recall-time processing pipeline — spec §19.4.4, §19.7.
3Pipeline order (§19.7.5):
4 Storage layer
5 → Scope/garden ACL (done in facts.py)
6 → Source-trust multiplier applied to effective_confidence ← here
7 → Content sanitizer ← here
8 → API response serializer
10Exported surface:
11 apply_recall_pipeline(records, identity, include_low_trust) -> list[FactRecord]
12 run_sanitizer(record) -> FactRecord (exposed for tests)
13"""
15from __future__ import annotations
17import logging
18import re
19import unicodedata
20from typing import TYPE_CHECKING
22if TYPE_CHECKING:
23 from ..auth import Identity
24 from ..models.facts import FactRecord
26logger = logging.getLogger("stigmem.recall")
28# ---------------------------------------------------------------------------
29# Sentinel patterns (§19.7.2) — compiled once at module load
30# ---------------------------------------------------------------------------
32_DEFAULT_PATTERNS: list[str] = [
33 r"\bignore\s+(all\s+)?previous\s+instructions?\b",
34 r"\bdisregard\s+(all\s+)?previous\s+(prompt|instructions?)\b",
35 r"\byou\s+are\s+now\s+(?:in\s+)?(?:a\s+)?(?:different|new)\s+mode\b",
36 r"\bact\s+as\s+(?:an?\s+)?(?:evil|unfiltered|uncensored|dan\b)",
37 r"\bsystem\s+prompt\s*:\s*",
38 r"<\|im_start\|>",
39 r"<\|im_end\|>",
40 r"\[INST\]",
41 r"\[\/INST\]",
42 r"\bHuman:\s*",
43 r"\bAssistant:\s*",
44 r'\{\s*"__proto__"\s*:',
45 r'\{\s*"constructor"\s*:',
46]
48# Invisible / bidi control characters to strip (§19.7.2)
49_BIDI_CONTROLS = frozenset(
50 chr(c)
51 for c in [
52 0x200F,
53 0x200E,
54 *range(0x202A, 0x202F),
55 *range(0x2066, 0x206A),
56 *range(0x200B, 0x200E),
57 0xFEFF,
58 ]
59)
61# Control characters forbidden in string/text output (§19.7.3)
62_CONTROL_CHARS_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")
65def _load_compiled_patterns() -> list[re.Pattern[str]]:
66 from ..settings import settings
68 patterns = list(_DEFAULT_PATTERNS)
70 extra_file = settings.sanitizer_extra_patterns_file
71 if extra_file: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 try:
73 with open(extra_file) as f:
74 for line in f:
75 line = line.strip()
76 if line and not line.startswith("#"):
77 patterns.append(line)
78 except FileNotFoundError:
79 logger.warning("sanitizer_extra_patterns_file not found: %s", extra_file)
80 except Exception as exc:
81 logger.error("Failed to read sanitizer_extra_patterns_file: %s", exc)
83 return [re.compile(p, re.IGNORECASE) for p in patterns]
86# Lazy-load compiled patterns; reset if settings change.
87_compiled_patterns: list[re.Pattern[str]] | None = None
90def _get_patterns() -> list[re.Pattern[str]]:
91 global _compiled_patterns
92 if _compiled_patterns is None:
93 _compiled_patterns = _load_compiled_patterns()
94 return _compiled_patterns
97def reset_pattern_cache() -> None:
98 global _compiled_patterns
99 _compiled_patterns = None
102# ---------------------------------------------------------------------------
103# Public API
104# ---------------------------------------------------------------------------
107def apply_recall_pipeline(
108 records: list[FactRecord],
109 identity: Identity | None = None,
110 include_low_trust: bool = False,
111) -> list[FactRecord]:
112 """Apply trust multiplier and content sanitizer to a list of FactRecords.
114 Returns a filtered/annotated list. Records that should be fully excluded
115 (block mode) are replaced with a placeholder FactRecord with sanitizer_redacted=True.
116 """
117 from ..settings import settings
119 trust_mode = settings.trust_mode
120 sanitizer_mode = settings.sanitizer_mode
122 if trust_mode == "off":
123 sanitizer_mode = "off"
125 result: list[FactRecord] = []
127 for record in records:
128 # Skip quarantined facts from normal recall results (§19.5.2)
129 # (rejected facts have confidence=0 and are caught by min_confidence filter)
130 if record.quarantine_status == "pending":
131 continue
133 # 1. Source-trust multiplier (§19.4.4)
134 eff_conf = record.confidence
135 if trust_mode != "off":
136 from ..source_trust import compute_source_trust
138 t = compute_source_trust(record.source, record.scope, identity)
139 eff_conf = record.confidence * t
140 record = record.model_copy(update={"effective_confidence": eff_conf, "source_trust": t})
142 # In strict mode, filter facts below 0.3 effective confidence unless
143 # caller opts in (§19.4.4). In relaxed mode, MUST NOT filter based
144 # solely on a low score (§19.4.3).
145 if trust_mode == "strict" and eff_conf < 0.3 and not include_low_trust:
146 continue
148 # 2. Content sanitizer (§19.7)
149 if sanitizer_mode != "off":
150 record = _run_sanitizer(record, sanitizer_mode, eff_conf)
152 result.append(record)
154 return result
157def _run_sanitizer(record: FactRecord, mode: str, effective_confidence: float) -> FactRecord:
158 """Run content sanitizer on one fact record. Returns possibly-modified record."""
159 from ..models.facts import FactValue
161 value = record.value
162 if value.type not in ("string", "text"):
163 # Schema type enforcement for non-string types (§19.7.3)
164 return _enforce_schema_types(record)
166 # Normalize to NFKC, strip bidi controls (§19.7.2)
167 raw_v = str(value.v) if value.v is not None else ""
168 normalized = unicodedata.normalize("NFKC", raw_v)
169 stripped = "".join(c for c in normalized if c not in _BIDI_CONTROLS)
170 # Strip control chars (§19.7.3)
171 cleaned = _CONTROL_CHARS_RE.sub("", stripped)
173 # Pattern matching
174 matched_patterns: list[str] = []
175 for pat in _get_patterns():
176 if pat.search(cleaned):
177 matched_patterns.append(pat.pattern)
179 if not matched_patterns:
180 # No sentinel match; apply cleaned value if content was modified
181 if cleaned != raw_v: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 return record.model_copy(update={"value": FactValue(type=value.type, v=cleaned)})
183 return record
185 # Sentinel matched — apply enforcement mode
186 if mode == "warn":
187 return record.model_copy(
188 update={
189 "sanitizer_warnings": matched_patterns,
190 "value": FactValue(type=value.type, v=cleaned),
191 }
192 )
194 if mode == "block":
195 logger.debug("Sanitizer blocked fact %s (matched: %s)", record.id, matched_patterns[0])
196 return record.model_copy(
197 update={
198 "sanitizer_redacted": True,
199 "sanitizer_warnings": matched_patterns,
200 "value": FactValue(type=value.type, v=None),
201 }
202 )
204 if mode == "quarantine": 204 ↛ 216line 204 didn't jump to line 216 because the condition on line 204 was always true
205 _quarantine_via_sanitizer(record.id, matched_patterns[0])
206 logger.info("Sanitizer quarantined fact %s", record.id)
207 # Return redacted record for this response
208 return record.model_copy(
209 update={
210 "sanitizer_redacted": True,
211 "sanitizer_warnings": matched_patterns,
212 "value": FactValue(type=value.type, v=None),
213 }
214 )
216 return record
219def _enforce_schema_types(record: FactRecord) -> FactRecord:
220 """Enforce schema correctness for non-string FactValue types (§19.7.3)."""
221 import math
223 from ..models.facts import FactValue
225 v = record.value
227 if v.type == "number" and ( 227 ↛ 230line 227 didn't jump to line 230 because the condition on line 227 was never true
228 v.v is None or (isinstance(v.v, float) and (math.isnan(v.v) or math.isinf(v.v)))
229 ):
230 return record.model_copy(
231 update={"value": FactValue(type="number", v=None), "sanitizer_redacted": True}
232 )
234 if v.type == "ref":
235 raw = str(v.v) if v.v is not None else ""
236 if "://" not in raw and not raw.startswith("urn:"): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 return record.model_copy(
238 update={"value": FactValue(type="ref", v=None), "sanitizer_redacted": True}
239 )
241 return record
244def _quarantine_via_sanitizer(fact_id: str, matched_pattern: str) -> None:
245 """Move a fact to the node's quarantine garden (sanitizer quarantine mode)."""
246 from datetime import UTC, datetime
248 from ..db import db
249 from ..lifecycle.immutability import set_fact_quarantine_status
250 from ..observability.audit_event import (
251 INSTRUCTION_QUARANTINED,
252 emit_instruction_event_if_applicable,
253 )
254 from ..settings import settings
256 qg_id = settings.quarantine_garden_id
257 if not qg_id: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 logger.warning(
259 "Sanitizer quarantine mode set but quarantine_garden_id not configured; "
260 "fact %s will not be quarantined",
261 fact_id,
262 )
263 return
265 now = datetime.now(UTC).isoformat()
267 try:
268 with db() as conn:
269 # Resolve garden UUID
270 qg_row = conn.execute(
271 "SELECT id FROM gardens WHERE (id = ? OR slug = ?) AND quarantine = 1",
272 (qg_id, qg_id),
273 ).fetchone()
274 if qg_row is None: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 logger.warning(
276 "Quarantine garden %s not found; skipping sanitizer quarantine", qg_id
277 )
278 return
279 fact_row = conn.execute(
280 "SELECT entity, relation FROM facts WHERE id = ?",
281 (fact_id,),
282 ).fetchone()
284 set_fact_quarantine_status(
285 conn,
286 fact_id=fact_id,
287 quarantine_garden_id=qg_row["id"],
288 quarantine_status="pending",
289 quarantine_reason=f"sanitizer: {matched_pattern}",
290 )
292 # Audit (§19.7.6)
293 import uuid
295 audit_id = str(uuid.uuid4())
296 conn.execute(
297 """INSERT INTO fact_audit_log
298 (id, fact_id, event_type, entity_uri, oidc_sub, source, attested_key_id, ts)
299 VALUES (?,?,?,?,?,?,?,?)""",
300 (
301 audit_id,
302 fact_id,
303 "sanitizer_quarantine",
304 "system:stigmem",
305 None,
306 "system:stigmem",
307 None,
308 now,
309 ),
310 )
311 emit_instruction_event_if_applicable(
312 INSTRUCTION_QUARANTINED,
313 fact_id=fact_id,
314 fact_entity=fact_row["entity"] if fact_row is not None else None,
315 fact_relation=fact_row["relation"] if fact_row is not None else None,
316 actor_uri="system:stigmem",
317 source="system:stigmem",
318 detail={
319 "reason": "sanitizer_quarantine",
320 "matched_pattern": matched_pattern,
321 },
322 conn=conn,
323 )
324 except Exception as exc:
325 logger.error("Failed to quarantine fact %s via sanitizer: %s", fact_id, exc)