Coverage for node / src / stigmem_node / recall / recall_pipeline.py: 80%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Recall-time processing pipeline — spec §19.4.4, §19.7. 

2 

3Pipeline order (§19.7.5): 

4 Storage layer 

5 → Scope/garden ACL (done in facts.py) 

6 → Source-trust multiplier applied to effective_confidence ← here 

7 → Content sanitizer ← here 

8 → API response serializer 

9 

10Exported surface: 

11 apply_recall_pipeline(records, identity, include_low_trust) -> list[FactRecord] 

12 run_sanitizer(record) -> FactRecord (exposed for tests) 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18import re 

19import unicodedata 

20from typing import TYPE_CHECKING 

21 

22if TYPE_CHECKING: 

23 from ..auth import Identity 

24 from ..models.facts import FactRecord 

25 

26logger = logging.getLogger("stigmem.recall") 

27 

28# --------------------------------------------------------------------------- 

29# Sentinel patterns (§19.7.2) — compiled once at module load 

30# --------------------------------------------------------------------------- 

31 

32_DEFAULT_PATTERNS: list[str] = [ 

33 r"\bignore\s+(all\s+)?previous\s+instructions?\b", 

34 r"\bdisregard\s+(all\s+)?previous\s+(prompt|instructions?)\b", 

35 r"\byou\s+are\s+now\s+(?:in\s+)?(?:a\s+)?(?:different|new)\s+mode\b", 

36 r"\bact\s+as\s+(?:an?\s+)?(?:evil|unfiltered|uncensored|dan\b)", 

37 r"\bsystem\s+prompt\s*:\s*", 

38 r"<\|im_start\|>", 

39 r"<\|im_end\|>", 

40 r"\[INST\]", 

41 r"\[\/INST\]", 

42 r"\bHuman:\s*", 

43 r"\bAssistant:\s*", 

44 r'\{\s*"__proto__"\s*:', 

45 r'\{\s*"constructor"\s*:', 

46] 

47 

48# Invisible / bidi control characters to strip (§19.7.2) 

49_BIDI_CONTROLS = frozenset( 

50 chr(c) 

51 for c in [ 

52 0x200F, 

53 0x200E, 

54 *range(0x202A, 0x202F), 

55 *range(0x2066, 0x206A), 

56 *range(0x200B, 0x200E), 

57 0xFEFF, 

58 ] 

59) 

60 

61# Control characters forbidden in string/text output (§19.7.3) 

62_CONTROL_CHARS_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]") 

63 

64 

65def _load_compiled_patterns() -> list[re.Pattern[str]]: 

66 from ..settings import settings 

67 

68 patterns = list(_DEFAULT_PATTERNS) 

69 

70 extra_file = settings.sanitizer_extra_patterns_file 

71 if extra_file: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 try: 

73 with open(extra_file) as f: 

74 for line in f: 

75 line = line.strip() 

76 if line and not line.startswith("#"): 

77 patterns.append(line) 

78 except FileNotFoundError: 

79 logger.warning("sanitizer_extra_patterns_file not found: %s", extra_file) 

80 except Exception as exc: 

81 logger.error("Failed to read sanitizer_extra_patterns_file: %s", exc) 

82 

83 return [re.compile(p, re.IGNORECASE) for p in patterns] 

84 

85 

86# Lazy-load compiled patterns; reset if settings change. 

87_compiled_patterns: list[re.Pattern[str]] | None = None 

88 

89 

90def _get_patterns() -> list[re.Pattern[str]]: 

91 global _compiled_patterns 

92 if _compiled_patterns is None: 

93 _compiled_patterns = _load_compiled_patterns() 

94 return _compiled_patterns 

95 

96 

97def reset_pattern_cache() -> None: 

98 global _compiled_patterns 

99 _compiled_patterns = None 

100 

101 

102# --------------------------------------------------------------------------- 

103# Public API 

104# --------------------------------------------------------------------------- 

105 

106 

107def apply_recall_pipeline( 

108 records: list[FactRecord], 

109 identity: Identity | None = None, 

110 include_low_trust: bool = False, 

111) -> list[FactRecord]: 

112 """Apply trust multiplier and content sanitizer to a list of FactRecords. 

113 

114 Returns a filtered/annotated list. Records that should be fully excluded 

115 (block mode) are replaced with a placeholder FactRecord with sanitizer_redacted=True. 

116 """ 

117 from ..settings import settings 

118 

119 trust_mode = settings.trust_mode 

120 sanitizer_mode = settings.sanitizer_mode 

121 

122 if trust_mode == "off": 

123 sanitizer_mode = "off" 

124 

125 result: list[FactRecord] = [] 

126 

127 for record in records: 

128 # Skip quarantined facts from normal recall results (§19.5.2) 

129 # (rejected facts have confidence=0 and are caught by min_confidence filter) 

130 if record.quarantine_status == "pending": 

131 continue 

132 

133 # 1. Source-trust multiplier (§19.4.4) 

134 eff_conf = record.confidence 

135 if trust_mode != "off": 

136 from ..source_trust import compute_source_trust 

137 

138 t = compute_source_trust(record.source, record.scope, identity) 

139 eff_conf = record.confidence * t 

140 record = record.model_copy(update={"effective_confidence": eff_conf, "source_trust": t}) 

141 

142 # In strict mode, filter facts below 0.3 effective confidence unless 

143 # caller opts in (§19.4.4). In relaxed mode, MUST NOT filter based 

144 # solely on a low score (§19.4.3). 

145 if trust_mode == "strict" and eff_conf < 0.3 and not include_low_trust: 

146 continue 

147 

148 # 2. Content sanitizer (§19.7) 

149 if sanitizer_mode != "off": 

150 record = _run_sanitizer(record, sanitizer_mode, eff_conf) 

151 

152 result.append(record) 

153 

154 return result 

155 

156 

157def _run_sanitizer(record: FactRecord, mode: str, effective_confidence: float) -> FactRecord: 

158 """Run content sanitizer on one fact record. Returns possibly-modified record.""" 

159 from ..models.facts import FactValue 

160 

161 value = record.value 

162 if value.type not in ("string", "text"): 

163 # Schema type enforcement for non-string types (§19.7.3) 

164 return _enforce_schema_types(record) 

165 

166 # Normalize to NFKC, strip bidi controls (§19.7.2) 

167 raw_v = str(value.v) if value.v is not None else "" 

168 normalized = unicodedata.normalize("NFKC", raw_v) 

169 stripped = "".join(c for c in normalized if c not in _BIDI_CONTROLS) 

170 # Strip control chars (§19.7.3) 

171 cleaned = _CONTROL_CHARS_RE.sub("", stripped) 

172 

173 # Pattern matching 

174 matched_patterns: list[str] = [] 

175 for pat in _get_patterns(): 

176 if pat.search(cleaned): 

177 matched_patterns.append(pat.pattern) 

178 

179 if not matched_patterns: 

180 # No sentinel match; apply cleaned value if content was modified 

181 if cleaned != raw_v: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 return record.model_copy(update={"value": FactValue(type=value.type, v=cleaned)}) 

183 return record 

184 

185 # Sentinel matched — apply enforcement mode 

186 if mode == "warn": 

187 return record.model_copy( 

188 update={ 

189 "sanitizer_warnings": matched_patterns, 

190 "value": FactValue(type=value.type, v=cleaned), 

191 } 

192 ) 

193 

194 if mode == "block": 

195 logger.debug("Sanitizer blocked fact %s (matched: %s)", record.id, matched_patterns[0]) 

196 return record.model_copy( 

197 update={ 

198 "sanitizer_redacted": True, 

199 "sanitizer_warnings": matched_patterns, 

200 "value": FactValue(type=value.type, v=None), 

201 } 

202 ) 

203 

204 if mode == "quarantine": 204 ↛ 216line 204 didn't jump to line 216 because the condition on line 204 was always true

205 _quarantine_via_sanitizer(record.id, matched_patterns[0]) 

206 logger.info("Sanitizer quarantined fact %s", record.id) 

207 # Return redacted record for this response 

208 return record.model_copy( 

209 update={ 

210 "sanitizer_redacted": True, 

211 "sanitizer_warnings": matched_patterns, 

212 "value": FactValue(type=value.type, v=None), 

213 } 

214 ) 

215 

216 return record 

217 

218 

219def _enforce_schema_types(record: FactRecord) -> FactRecord: 

220 """Enforce schema correctness for non-string FactValue types (§19.7.3).""" 

221 import math 

222 

223 from ..models.facts import FactValue 

224 

225 v = record.value 

226 

227 if v.type == "number" and ( 227 ↛ 230line 227 didn't jump to line 230 because the condition on line 227 was never true

228 v.v is None or (isinstance(v.v, float) and (math.isnan(v.v) or math.isinf(v.v))) 

229 ): 

230 return record.model_copy( 

231 update={"value": FactValue(type="number", v=None), "sanitizer_redacted": True} 

232 ) 

233 

234 if v.type == "ref": 

235 raw = str(v.v) if v.v is not None else "" 

236 if "://" not in raw and not raw.startswith("urn:"): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 return record.model_copy( 

238 update={"value": FactValue(type="ref", v=None), "sanitizer_redacted": True} 

239 ) 

240 

241 return record 

242 

243 

244def _quarantine_via_sanitizer(fact_id: str, matched_pattern: str) -> None: 

245 """Move a fact to the node's quarantine garden (sanitizer quarantine mode).""" 

246 from datetime import UTC, datetime 

247 

248 from ..db import db 

249 from ..lifecycle.immutability import set_fact_quarantine_status 

250 from ..observability.audit_event import ( 

251 INSTRUCTION_QUARANTINED, 

252 emit_instruction_event_if_applicable, 

253 ) 

254 from ..settings import settings 

255 

256 qg_id = settings.quarantine_garden_id 

257 if not qg_id: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 logger.warning( 

259 "Sanitizer quarantine mode set but quarantine_garden_id not configured; " 

260 "fact %s will not be quarantined", 

261 fact_id, 

262 ) 

263 return 

264 

265 now = datetime.now(UTC).isoformat() 

266 

267 try: 

268 with db() as conn: 

269 # Resolve garden UUID 

270 qg_row = conn.execute( 

271 "SELECT id FROM gardens WHERE (id = ? OR slug = ?) AND quarantine = 1", 

272 (qg_id, qg_id), 

273 ).fetchone() 

274 if qg_row is None: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 logger.warning( 

276 "Quarantine garden %s not found; skipping sanitizer quarantine", qg_id 

277 ) 

278 return 

279 fact_row = conn.execute( 

280 "SELECT entity, relation FROM facts WHERE id = ?", 

281 (fact_id,), 

282 ).fetchone() 

283 

284 set_fact_quarantine_status( 

285 conn, 

286 fact_id=fact_id, 

287 quarantine_garden_id=qg_row["id"], 

288 quarantine_status="pending", 

289 quarantine_reason=f"sanitizer: {matched_pattern}", 

290 ) 

291 

292 # Audit (§19.7.6) 

293 import uuid 

294 

295 audit_id = str(uuid.uuid4()) 

296 conn.execute( 

297 """INSERT INTO fact_audit_log 

298 (id, fact_id, event_type, entity_uri, oidc_sub, source, attested_key_id, ts) 

299 VALUES (?,?,?,?,?,?,?,?)""", 

300 ( 

301 audit_id, 

302 fact_id, 

303 "sanitizer_quarantine", 

304 "system:stigmem", 

305 None, 

306 "system:stigmem", 

307 None, 

308 now, 

309 ), 

310 ) 

311 emit_instruction_event_if_applicable( 

312 INSTRUCTION_QUARANTINED, 

313 fact_id=fact_id, 

314 fact_entity=fact_row["entity"] if fact_row is not None else None, 

315 fact_relation=fact_row["relation"] if fact_row is not None else None, 

316 actor_uri="system:stigmem", 

317 source="system:stigmem", 

318 detail={ 

319 "reason": "sanitizer_quarantine", 

320 "matched_pattern": matched_pattern, 

321 }, 

322 conn=conn, 

323 ) 

324 except Exception as exc: 

325 logger.error("Failed to quarantine fact %s via sanitizer: %s", fact_id, exc)