Coverage for node / src / stigmem_node / routes / _facts_assert.py: 92%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Implementation of POST /v1/facts (assert_fact) extracted from routes/facts.py. 

2 

3Imported back into ``routes.facts``; the route stub there delegates to this 

4function inside its tracing span. Helper symbols are imported lazily inside 

5the function to keep the module-level import graph acyclic. 

6No behavioural changes — code was moved verbatim from facts.py. 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12import sys 

13import threading 

14import uuid 

15from datetime import UTC, datetime 

16from typing import Any 

17 

18from fastapi import HTTPException, status 

19 

20from ..auth import Identity 

21from ..billing import BillingEvent, get_hook_bus 

22from ..cid import compute_cid 

23from ..db import db 

24from ..entity_normalizer import NormalizationError, is_informal, normalize_entity_uri 

25from ..garden_acl import ( 

26 get_garden_by_garden_uri, 

27 require_garden_write, 

28) 

29from ..hlc import node_hlc 

30from ..lifecycle.immutability import set_embedding_status, write_fact_journal 

31from ..metrics import CONTRADICTION, FACT_WRITE 

32from ..models.facts import AssertRequest, FactRecord, row_to_record 

33from ..plugins import TenantContext, get_registry 

34from ..recall.fuzzy_resolver import resolve_entity 

35from ..session_graph import encode_derived_from, ensure_write_allowed, record_write_scope 

36from ..settings import settings as _settings # noqa: F401 — kept for parity 

37 

38 

39def _live_settings() -> Any: 

40 """Return the live Settings singleton. 

41 

42 Uses sys.modules directly because some test fixtures replace 

43 `stigmem_node.settings` (the module attribute on the parent package) with 

44 a Settings instance. `from .. import settings` and `import x.y` both go 

45 through that patched attribute and would return the instance instead of 

46 the module — sys.modules['stigmem_node.settings'] is the only path that 

47 reliably reaches the original module so we can read its `.settings` 

48 singleton (which IS what tests intend to swap). 

49 """ 

50 return sys.modules["stigmem_node.settings"].settings 

51 

52 

53logger = logging.getLogger("stigmem.facts") 

54 

55 

56def _verify_or_require_attestation(req: AssertRequest, identity: Identity) -> str | None: 

57 """C1: verify the attestation token (when supplied) or fail-closed if required.""" 

58 from .facts import _encode_v 

59 

60 if req.attestation is not None: 

61 from .agent_keys import verify_attestation 

62 

63 value_v_for_sig = _encode_v(req.value.type, req.value.v) 

64 canonical = ( 

65 f"{req.entity}\n{req.relation}\n{req.value.type}\n{value_v_for_sig}\n{req.source}" 

66 ).encode() 

67 return verify_attestation( 

68 key_id=req.attestation.key_id, 

69 signature_b64=req.attestation.signature, 

70 canonical_message=canonical, 

71 caller_entity_uri=identity.entity_uri, 

72 ) 

73 if _live_settings().attestation_required: 

74 raise HTTPException( 

75 status_code=status.HTTP_400_BAD_REQUEST, 

76 detail="attestation required; register an agent key at POST /v1/auth/agent-keys", 

77 ) 

78 return None 

79 

80 

81def _normalise_and_alias_uris(req: AssertRequest) -> tuple[str, str]: 

82 """Layer-1 strict normalisation + Layer-2 alias lookup. Emits deprecation warnings.""" 

83 try: 

84 entity = normalize_entity_uri(req.entity) 

85 source = normalize_entity_uri(req.source) 

86 except NormalizationError as exc: 

87 raise HTTPException( 

88 status_code=status.HTTP_400_BAD_REQUEST, 

89 detail=f"invalid_entity_uri: {exc}", 

90 ) from exc 

91 

92 # Deprecation warning for informal URIs (spec §2.5) 

93 if is_informal(req.entity): 

94 print( 

95 f"[stigmem] DEPRECATED: informal entity URI {req.entity!r} — " 

96 f"use stigmem://authority/type/id format (spec §2.5)", 

97 file=sys.stderr, 

98 ) 

99 if is_informal(req.source): 

100 print( 

101 f"[stigmem] DEPRECATED: informal source URI {req.source!r} — " 

102 f"use stigmem://authority/type/id format (spec §2.5)", 

103 file=sys.stderr, 

104 ) 

105 

106 # Layer 2: resolve user-defined semantic aliases (spec §2.6.6) on canonical forms. 

107 with db() as _alias_conn: 

108 return resolve_entity(_alias_conn, entity), resolve_entity(_alias_conn, source) 

109 

110 

111def _resolve_garden_for_assert(req: AssertRequest, identity: Identity) -> Any: 

112 """Spec §17.3: resolve garden_id, enforce scope match + write ACL. Returns row or None.""" 

113 if req.garden_id is None: 

114 return None 

115 garden = get_garden_by_garden_uri(req.garden_id, tenant_id=identity.tenant_id) 

116 if garden is None: 

117 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found") 

118 if garden["scope"] != req.scope: 

119 raise HTTPException( 

120 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, 

121 detail=( 

122 f"scope mismatch: garden scope is '{garden['scope']}' " 

123 f"but fact scope is '{req.scope}'" 

124 ), 

125 ) 

126 require_garden_write(garden, identity) 

127 return garden 

128 

129 

130def _existing_record_for_cid( 

131 conn: Any, 

132 fact_cid: str, 

133 tenant_id: str, 

134) -> FactRecord | None: 

135 """Return the existing record for ``fact_cid``, or None when no alias exists yet.""" 

136 existing_alias = conn.execute( 

137 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", (fact_cid,) 

138 ).fetchone() 

139 if existing_alias is None: 

140 return None 

141 existing_row = conn.execute( 

142 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?", 

143 (existing_alias["fact_id"], tenant_id), 

144 ).fetchone() 

145 return row_to_record(existing_row, contradicted=False) if existing_row is not None else None 

146 

147 

148def _require_interpretation_write(identity: Identity, interpret_as: str) -> None: 

149 if interpret_as != "instruction": 

150 return 

151 if identity.can_write_instruction(): 

152 return 

153 raise HTTPException( 

154 status_code=status.HTTP_403_FORBIDDEN, 

155 detail={ 

156 "code": "instruction_write_required", 

157 "message": "writing instruction-typed facts requires instruction:write permission", 

158 }, 

159 ) 

160 

161 

162def _detect_and_record_contradictions( 

163 conn: Any, 

164 fact_id: str, 

165 entity: str, 

166 req: AssertRequest, 

167 identity: Identity, 

168) -> bool: 

169 """Spec §9.1: skip system facts; else find siblings sharing (entity, relation, scope).""" 

170 from .facts import _SYSTEM_RELATION_PREFIX, _record_contradictions 

171 

172 is_system = ( 

173 entity.startswith(_SYSTEM_RELATION_PREFIX) and not entity.startswith("stigmem://") 

174 ) or ( 

175 req.relation.startswith(_SYSTEM_RELATION_PREFIX) 

176 and not req.relation.startswith("stigmem://") 

177 ) 

178 if is_system: 

179 return False 

180 siblings = conn.execute( 

181 """SELECT id FROM facts 

182 WHERE entity=? AND relation=? AND scope=? AND id!=? AND confidence>0.0 

183 AND tenant_id=?""", 

184 (entity, req.relation, req.scope, fact_id, identity.tenant_id), 

185 ).fetchall() 

186 if not siblings: 

187 return False 

188 _record_contradictions( 

189 conn, 

190 fact_id, 

191 entity, 

192 req.relation, 

193 req.scope, 

194 siblings, 

195 identity.tenant_id, 

196 ) 

197 print( 

198 f"[stigmem] WARN: collision — entity={entity!r} relation={req.relation!r} " 

199 f"scope={req.scope!r}: fact {fact_id!r} contradicts {len(siblings)} existing " 

200 f"fact(s); verify relation namespacing (see relation-convention.md)", 

201 file=sys.stderr, 

202 ) 

203 return True 

204 

205 

206def _emit_post_write_hooks( 

207 *, 

208 fact_id: str, 

209 entity: str, 

210 source: str, 

211 req: AssertRequest, 

212 identity: Identity, 

213 value_v: str | None, 

214 garden_uuid: str | None, 

215 now: str, 

216 contradicted: bool, 

217 _span: object, 

218) -> None: 

219 """Card-stale + background embed + billing + subscription fan-out + metrics + span attrs.""" 

220 from .facts import _embed_fact_background 

221 

222 # Phase 9: mark entity's memory card stale on every write (ACM-214) 

223 try: 

224 from ..card_materializer import mark_entity_stale as _mark_stale 

225 

226 _mark_stale(entity, req.scope, identity.tenant_id) 

227 except Exception as _card_exc: 

228 logger.warning("card mark_stale failed for %r: %s", entity, _card_exc) 

229 

230 # Phase 9 §2: write-time embedding (background thread, graceful fallback) 

231 if _live_settings().embed_enabled: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 threading.Thread( 

233 target=_embed_fact_background, 

234 args=(fact_id, entity, req.relation, req.value.type, value_v or ""), 

235 daemon=True, 

236 ).start() 

237 

238 get_hook_bus().emit( 

239 BillingEvent( 

240 event_type="fact_written", 

241 tenant_id=identity.tenant_id, 

242 entity_uri=identity.entity_uri, 

243 fact_id=fact_id, 

244 ) 

245 ) 

246 

247 # §20: fan out to subscribers (fast DB insert only; delivery happens in sweep loop) 

248 try: 

249 import json as _json 

250 

251 from ..subscription_delivery import fan_out as _subscription_fan_out 

252 

253 _subscription_fan_out( 

254 fact_id=fact_id, 

255 entity=entity, 

256 scope=req.scope, 

257 garden_id=garden_uuid, 

258 tenant_id=identity.tenant_id, 

259 fact_payload_json=_json.dumps( 

260 { 

261 "id": fact_id, 

262 "entity": entity, 

263 "relation": req.relation, 

264 "value_type": req.value.type, 

265 "value_v": value_v, 

266 "source": source, 

267 "timestamp": now, 

268 "scope": req.scope, 

269 "confidence": req.confidence, 

270 "garden_id": garden_uuid, 

271 } 

272 ), 

273 ) 

274 except Exception as _sub_exc: 

275 print(f"[stigmem] WARN: subscription fan_out failed: {_sub_exc}", file=sys.stderr) 

276 

277 FACT_WRITE.labels(principal=identity.entity_uri, tenant=identity.tenant_id).inc() 

278 if contradicted: 

279 CONTRADICTION.labels(tenant=identity.tenant_id).inc() 

280 try: 

281 _span.set_attribute("stigmem.fact_id", fact_id) # type: ignore[attr-defined] 

282 _span.set_attribute("stigmem.contradicted", contradicted) # type: ignore[attr-defined] 

283 except AttributeError as _span_exc: 

284 logger.debug("span attribute set skipped: %s", _span_exc) 

285 

286 

287def assert_fact_impl( 

288 req: AssertRequest, 

289 identity: Identity, 

290 _span: object, 

291 *, 

292 request_id: str, 

293 tenant: TenantContext, 

294 session_id: str | None = None, 

295) -> FactRecord: 

296 # Lazy imports of sibling helpers to avoid circular import with .facts 

297 from .facts import ( 

298 _encode_v, 

299 _is_valid_entity_uri, 

300 _validate_relation, 

301 ) 

302 

303 if not identity.can_write(): 

304 raise HTTPException( 

305 status_code=status.HTTP_403_FORBIDDEN, 

306 detail="write permission required", 

307 ) 

308 

309 attested_key_id = _verify_or_require_attestation(req, identity) 

310 entity, source = _normalise_and_alias_uris(req) 

311 

312 # Source-attestation policy is plugin-owned. Core preserves the field but 

313 # does not evaluate source/identity binding in default installs. 

314 attested = None 

315 garden = _resolve_garden_for_assert(req, identity) 

316 

317 garden_uuid = garden["id"] if garden is not None else None 

318 attested_int = None if attested is None else (1 if attested else 0) 

319 

320 # Relation namespacing convention check (see relation-convention.md) 

321 relation_warnings = _validate_relation(req.relation) 

322 for w in relation_warnings: 

323 print(f"[stigmem] WARN: relation naming: {w}", file=sys.stderr) 

324 

325 _require_interpretation_write(identity, req.value.interpret_as) 

326 

327 fact_id = str(uuid.uuid4()) 

328 now = datetime.now(UTC).isoformat() 

329 hlc = node_hlc.tick() 

330 value_v = _encode_v(req.value.type, req.value.v) 

331 

332 # §25.7.3: compute CID before write; persisted in the same transaction 

333 fact_cid = compute_cid( 

334 entity=entity, 

335 relation=req.relation, 

336 value_type=req.value.type, 

337 value_v=value_v or "", 

338 source=source, 

339 scope=req.scope, 

340 confidence=req.confidence, 

341 ) 

342 

343 _embed_enabled = _live_settings().embed_enabled 

344 embedding_missing_val = 1 if _embed_enabled else None 

345 

346 derived_from_json = encode_derived_from(req.derived_from) 

347 

348 # F-10 §25.7.3: idempotent CID pre-check — if CID already exists, return existing record 

349 with db() as _precheck_conn: 

350 ensure_write_allowed( 

351 _precheck_conn, 

352 identity=identity, 

353 session_id=session_id, 

354 target_scope=req.scope, 

355 write_mode=req.write_mode, 

356 derived_from=req.derived_from, 

357 ) 

358 existing_record = _existing_record_for_cid(_precheck_conn, fact_cid, identity.tenant_id) 

359 if existing_record is not None: 

360 return existing_record 

361 

362 with db() as conn: 

363 ensure_write_allowed( 

364 conn, 

365 identity=identity, 

366 session_id=session_id, 

367 target_scope=req.scope, 

368 write_mode=req.write_mode, 

369 derived_from=req.derived_from, 

370 ) 

371 conn.execute( 

372 """INSERT INTO facts 

373 (id, entity, relation, value_type, value_v, source, timestamp, 

374 valid_until, confidence, scope, hlc, received_from, attested_key_id, 

375 garden_id, attested, tenant_id, embedding_missing, cid, derived_from, 

376 interpret_as) 

377 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", 

378 ( 

379 fact_id, 

380 entity, # normalized (spec §2.6) 

381 req.relation, 

382 req.value.type, 

383 value_v, 

384 source, # normalized (spec §2.6) 

385 now, 

386 req.valid_until, 

387 req.confidence, 

388 req.scope, 

389 hlc, 

390 None, # local write; not received from a peer 

391 attested_key_id, 

392 garden_uuid, 

393 attested_int, 

394 identity.tenant_id, 

395 embedding_missing_val, 

396 fact_cid, 

397 derived_from_json, 

398 req.value.interpret_as, 

399 ), 

400 ) 

401 write_fact_journal( 

402 conn, 

403 fact_id=fact_id, 

404 event_type="fact_insert", 

405 tenant_id=identity.tenant_id, 

406 actor_uri=identity.entity_uri, 

407 source=source, 

408 scope=req.scope, 

409 cid=fact_cid, 

410 body={ 

411 "entity": entity, 

412 "relation": req.relation, 

413 "value_type": req.value.type, 

414 "value_v": value_v, 

415 "source": source, 

416 "timestamp": now, 

417 "valid_until": req.valid_until, 

418 "confidence": req.confidence, 

419 "scope": req.scope, 

420 "interpret_as": req.value.interpret_as, 

421 }, 

422 ) 

423 if embedding_missing_val is not None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 set_embedding_status( 

425 conn, 

426 fact_id=fact_id, 

427 embedding_missing=bool(embedding_missing_val), 

428 updated_by="fact_assert", 

429 ) 

430 

431 # F-10 §25.7.3: alias table row — idempotent upsert on CID collision 

432 alias_result = conn.execute( 

433 "INSERT OR IGNORE INTO fact_cid_aliases (fact_id, cid) VALUES (?, ?)", 

434 (fact_id, fact_cid), 

435 ) 

436 if alias_result.rowcount == 0: 436 ↛ 438line 436 didn't jump to line 438 because the condition on line 436 was never true

437 # Concurrent same-CID write race: return existing record 

438 existing = conn.execute( 

439 "SELECT f.* FROM facts f JOIN fact_cid_aliases a ON a.fact_id = f.id" 

440 " WHERE a.cid = ? AND f.tenant_id = ?", 

441 (fact_cid, identity.tenant_id), 

442 ).fetchone() 

443 if existing is not None: 

444 return row_to_record(existing, contradicted=False) 

445 

446 record_write_scope( 

447 conn, 

448 identity=identity, 

449 session_id=session_id, 

450 scope=req.scope, 

451 ) 

452 

453 # C3 / §22.3: write-ahead audit entry for fact_write event (same transaction) 

454 from ..audit_event import emit as _emit_audit 

455 

456 _emit_audit( 

457 "fact_write", 

458 entity_uri=identity.entity_uri, 

459 tenant_id=identity.tenant_id, 

460 oidc_sub=identity.oidc_sub, 

461 fact_id=fact_id, 

462 source=source, 

463 attested_key_id=attested_key_id, 

464 scope=req.scope, 

465 conn=conn, 

466 ) 

467 

468 # Graph adjacency index (§20.1.1): materialize edge for ref-typed facts 

469 if req.value.type == "ref" and value_v and _is_valid_entity_uri(value_v): 

470 from ..recall.graph_index import upsert_edge as _upsert_edge 

471 

472 _upsert_edge( 

473 conn, 

474 fact_id=fact_id, 

475 subject=entity, 

476 relation=req.relation, 

477 object_uri=value_v, 

478 scope=req.scope, 

479 confidence=req.confidence, 

480 garden_id=garden_uuid, 

481 tenant_id=identity.tenant_id, 

482 received_from=None, 

483 source_trust=None, 

484 valid_until=req.valid_until, 

485 ) 

486 

487 row = conn.execute("SELECT * FROM facts WHERE id=?", (fact_id,)).fetchone() 

488 from ..fact_chain import append_fact_chain_entry 

489 

490 append_fact_chain_entry(conn, row) 

491 persisted_record = row_to_record(row, contradicted=False) 

492 get_registry().fire_fire_and_forget( 

493 "post_assert_persist", 

494 fact=persisted_record, 

495 identity=identity, 

496 tenant=tenant, 

497 request_id=request_id, 

498 ) 

499 contradicted = _detect_and_record_contradictions(conn, fact_id, entity, req, identity) 

500 

501 _emit_post_write_hooks( 

502 fact_id=fact_id, 

503 entity=entity, 

504 source=source, 

505 req=req, 

506 identity=identity, 

507 value_v=value_v, 

508 garden_uuid=garden_uuid, 

509 now=now, 

510 contradicted=contradicted, 

511 _span=_span, 

512 ) 

513 

514 return row_to_record(row, contradicted=contradicted, warnings=relation_warnings)