Coverage for node / src / stigmem_node / routes / _facts_assert.py: 92%
156 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Implementation of POST /v1/facts (assert_fact) extracted from routes/facts.py.
3Imported back into ``routes.facts``; the route stub there delegates to this
4function inside its tracing span. Helper symbols are imported lazily inside
5the function to keep the module-level import graph acyclic.
6No behavioural changes — code was moved verbatim from facts.py.
7"""
9from __future__ import annotations
11import logging
12import sys
13import threading
14import uuid
15from datetime import UTC, datetime
16from typing import Any
18from fastapi import HTTPException, status
20from ..auth import Identity
21from ..billing import BillingEvent, get_hook_bus
22from ..cid import compute_cid
23from ..db import db
24from ..entity_normalizer import NormalizationError, is_informal, normalize_entity_uri
25from ..garden_acl import (
26 get_garden_by_garden_uri,
27 require_garden_write,
28)
29from ..hlc import node_hlc
30from ..lifecycle.immutability import set_embedding_status, write_fact_journal
31from ..metrics import CONTRADICTION, FACT_WRITE
32from ..models.facts import AssertRequest, FactRecord, row_to_record
33from ..plugins import TenantContext, get_registry
34from ..recall.fuzzy_resolver import resolve_entity
35from ..session_graph import encode_derived_from, ensure_write_allowed, record_write_scope
36from ..settings import settings as _settings # noqa: F401 — kept for parity
39def _live_settings() -> Any:
40 """Return the live Settings singleton.
42 Uses sys.modules directly because some test fixtures replace
43 `stigmem_node.settings` (the module attribute on the parent package) with
44 a Settings instance. `from .. import settings` and `import x.y` both go
45 through that patched attribute and would return the instance instead of
46 the module — sys.modules['stigmem_node.settings'] is the only path that
47 reliably reaches the original module so we can read its `.settings`
48 singleton (which IS what tests intend to swap).
49 """
50 return sys.modules["stigmem_node.settings"].settings
53logger = logging.getLogger("stigmem.facts")
56def _verify_or_require_attestation(req: AssertRequest, identity: Identity) -> str | None:
57 """C1: verify the attestation token (when supplied) or fail-closed if required."""
58 from .facts import _encode_v
60 if req.attestation is not None:
61 from .agent_keys import verify_attestation
63 value_v_for_sig = _encode_v(req.value.type, req.value.v)
64 canonical = (
65 f"{req.entity}\n{req.relation}\n{req.value.type}\n{value_v_for_sig}\n{req.source}"
66 ).encode()
67 return verify_attestation(
68 key_id=req.attestation.key_id,
69 signature_b64=req.attestation.signature,
70 canonical_message=canonical,
71 caller_entity_uri=identity.entity_uri,
72 )
73 if _live_settings().attestation_required:
74 raise HTTPException(
75 status_code=status.HTTP_400_BAD_REQUEST,
76 detail="attestation required; register an agent key at POST /v1/auth/agent-keys",
77 )
78 return None
81def _normalise_and_alias_uris(req: AssertRequest) -> tuple[str, str]:
82 """Layer-1 strict normalisation + Layer-2 alias lookup. Emits deprecation warnings."""
83 try:
84 entity = normalize_entity_uri(req.entity)
85 source = normalize_entity_uri(req.source)
86 except NormalizationError as exc:
87 raise HTTPException(
88 status_code=status.HTTP_400_BAD_REQUEST,
89 detail=f"invalid_entity_uri: {exc}",
90 ) from exc
92 # Deprecation warning for informal URIs (spec §2.5)
93 if is_informal(req.entity):
94 print(
95 f"[stigmem] DEPRECATED: informal entity URI {req.entity!r} — "
96 f"use stigmem://authority/type/id format (spec §2.5)",
97 file=sys.stderr,
98 )
99 if is_informal(req.source):
100 print(
101 f"[stigmem] DEPRECATED: informal source URI {req.source!r} — "
102 f"use stigmem://authority/type/id format (spec §2.5)",
103 file=sys.stderr,
104 )
106 # Layer 2: resolve user-defined semantic aliases (spec §2.6.6) on canonical forms.
107 with db() as _alias_conn:
108 return resolve_entity(_alias_conn, entity), resolve_entity(_alias_conn, source)
111def _resolve_garden_for_assert(req: AssertRequest, identity: Identity) -> Any:
112 """Spec §17.3: resolve garden_id, enforce scope match + write ACL. Returns row or None."""
113 if req.garden_id is None:
114 return None
115 garden = get_garden_by_garden_uri(req.garden_id, tenant_id=identity.tenant_id)
116 if garden is None:
117 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found")
118 if garden["scope"] != req.scope:
119 raise HTTPException(
120 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
121 detail=(
122 f"scope mismatch: garden scope is '{garden['scope']}' "
123 f"but fact scope is '{req.scope}'"
124 ),
125 )
126 require_garden_write(garden, identity)
127 return garden
130def _existing_record_for_cid(
131 conn: Any,
132 fact_cid: str,
133 tenant_id: str,
134) -> FactRecord | None:
135 """Return the existing record for ``fact_cid``, or None when no alias exists yet."""
136 existing_alias = conn.execute(
137 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", (fact_cid,)
138 ).fetchone()
139 if existing_alias is None:
140 return None
141 existing_row = conn.execute(
142 "SELECT * FROM facts WHERE id = ? AND tenant_id = ?",
143 (existing_alias["fact_id"], tenant_id),
144 ).fetchone()
145 return row_to_record(existing_row, contradicted=False) if existing_row is not None else None
148def _require_interpretation_write(identity: Identity, interpret_as: str) -> None:
149 if interpret_as != "instruction":
150 return
151 if identity.can_write_instruction():
152 return
153 raise HTTPException(
154 status_code=status.HTTP_403_FORBIDDEN,
155 detail={
156 "code": "instruction_write_required",
157 "message": "writing instruction-typed facts requires instruction:write permission",
158 },
159 )
162def _detect_and_record_contradictions(
163 conn: Any,
164 fact_id: str,
165 entity: str,
166 req: AssertRequest,
167 identity: Identity,
168) -> bool:
169 """Spec §9.1: skip system facts; else find siblings sharing (entity, relation, scope)."""
170 from .facts import _SYSTEM_RELATION_PREFIX, _record_contradictions
172 is_system = (
173 entity.startswith(_SYSTEM_RELATION_PREFIX) and not entity.startswith("stigmem://")
174 ) or (
175 req.relation.startswith(_SYSTEM_RELATION_PREFIX)
176 and not req.relation.startswith("stigmem://")
177 )
178 if is_system:
179 return False
180 siblings = conn.execute(
181 """SELECT id FROM facts
182 WHERE entity=? AND relation=? AND scope=? AND id!=? AND confidence>0.0
183 AND tenant_id=?""",
184 (entity, req.relation, req.scope, fact_id, identity.tenant_id),
185 ).fetchall()
186 if not siblings:
187 return False
188 _record_contradictions(
189 conn,
190 fact_id,
191 entity,
192 req.relation,
193 req.scope,
194 siblings,
195 identity.tenant_id,
196 )
197 print(
198 f"[stigmem] WARN: collision — entity={entity!r} relation={req.relation!r} "
199 f"scope={req.scope!r}: fact {fact_id!r} contradicts {len(siblings)} existing "
200 f"fact(s); verify relation namespacing (see relation-convention.md)",
201 file=sys.stderr,
202 )
203 return True
206def _emit_post_write_hooks(
207 *,
208 fact_id: str,
209 entity: str,
210 source: str,
211 req: AssertRequest,
212 identity: Identity,
213 value_v: str | None,
214 garden_uuid: str | None,
215 now: str,
216 contradicted: bool,
217 _span: object,
218) -> None:
219 """Card-stale + background embed + billing + subscription fan-out + metrics + span attrs."""
220 from .facts import _embed_fact_background
222 # Phase 9: mark entity's memory card stale on every write (ACM-214)
223 try:
224 from ..card_materializer import mark_entity_stale as _mark_stale
226 _mark_stale(entity, req.scope, identity.tenant_id)
227 except Exception as _card_exc:
228 logger.warning("card mark_stale failed for %r: %s", entity, _card_exc)
230 # Phase 9 §2: write-time embedding (background thread, graceful fallback)
231 if _live_settings().embed_enabled: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 threading.Thread(
233 target=_embed_fact_background,
234 args=(fact_id, entity, req.relation, req.value.type, value_v or ""),
235 daemon=True,
236 ).start()
238 get_hook_bus().emit(
239 BillingEvent(
240 event_type="fact_written",
241 tenant_id=identity.tenant_id,
242 entity_uri=identity.entity_uri,
243 fact_id=fact_id,
244 )
245 )
247 # §20: fan out to subscribers (fast DB insert only; delivery happens in sweep loop)
248 try:
249 import json as _json
251 from ..subscription_delivery import fan_out as _subscription_fan_out
253 _subscription_fan_out(
254 fact_id=fact_id,
255 entity=entity,
256 scope=req.scope,
257 garden_id=garden_uuid,
258 tenant_id=identity.tenant_id,
259 fact_payload_json=_json.dumps(
260 {
261 "id": fact_id,
262 "entity": entity,
263 "relation": req.relation,
264 "value_type": req.value.type,
265 "value_v": value_v,
266 "source": source,
267 "timestamp": now,
268 "scope": req.scope,
269 "confidence": req.confidence,
270 "garden_id": garden_uuid,
271 }
272 ),
273 )
274 except Exception as _sub_exc:
275 print(f"[stigmem] WARN: subscription fan_out failed: {_sub_exc}", file=sys.stderr)
277 FACT_WRITE.labels(principal=identity.entity_uri, tenant=identity.tenant_id).inc()
278 if contradicted:
279 CONTRADICTION.labels(tenant=identity.tenant_id).inc()
280 try:
281 _span.set_attribute("stigmem.fact_id", fact_id) # type: ignore[attr-defined]
282 _span.set_attribute("stigmem.contradicted", contradicted) # type: ignore[attr-defined]
283 except AttributeError as _span_exc:
284 logger.debug("span attribute set skipped: %s", _span_exc)
287def assert_fact_impl(
288 req: AssertRequest,
289 identity: Identity,
290 _span: object,
291 *,
292 request_id: str,
293 tenant: TenantContext,
294 session_id: str | None = None,
295) -> FactRecord:
296 # Lazy imports of sibling helpers to avoid circular import with .facts
297 from .facts import (
298 _encode_v,
299 _is_valid_entity_uri,
300 _validate_relation,
301 )
303 if not identity.can_write():
304 raise HTTPException(
305 status_code=status.HTTP_403_FORBIDDEN,
306 detail="write permission required",
307 )
309 attested_key_id = _verify_or_require_attestation(req, identity)
310 entity, source = _normalise_and_alias_uris(req)
312 # Source-attestation policy is plugin-owned. Core preserves the field but
313 # does not evaluate source/identity binding in default installs.
314 attested = None
315 garden = _resolve_garden_for_assert(req, identity)
317 garden_uuid = garden["id"] if garden is not None else None
318 attested_int = None if attested is None else (1 if attested else 0)
320 # Relation namespacing convention check (see relation-convention.md)
321 relation_warnings = _validate_relation(req.relation)
322 for w in relation_warnings:
323 print(f"[stigmem] WARN: relation naming: {w}", file=sys.stderr)
325 _require_interpretation_write(identity, req.value.interpret_as)
327 fact_id = str(uuid.uuid4())
328 now = datetime.now(UTC).isoformat()
329 hlc = node_hlc.tick()
330 value_v = _encode_v(req.value.type, req.value.v)
332 # §25.7.3: compute CID before write; persisted in the same transaction
333 fact_cid = compute_cid(
334 entity=entity,
335 relation=req.relation,
336 value_type=req.value.type,
337 value_v=value_v or "",
338 source=source,
339 scope=req.scope,
340 confidence=req.confidence,
341 )
343 _embed_enabled = _live_settings().embed_enabled
344 embedding_missing_val = 1 if _embed_enabled else None
346 derived_from_json = encode_derived_from(req.derived_from)
348 # F-10 §25.7.3: idempotent CID pre-check — if CID already exists, return existing record
349 with db() as _precheck_conn:
350 ensure_write_allowed(
351 _precheck_conn,
352 identity=identity,
353 session_id=session_id,
354 target_scope=req.scope,
355 write_mode=req.write_mode,
356 derived_from=req.derived_from,
357 )
358 existing_record = _existing_record_for_cid(_precheck_conn, fact_cid, identity.tenant_id)
359 if existing_record is not None:
360 return existing_record
362 with db() as conn:
363 ensure_write_allowed(
364 conn,
365 identity=identity,
366 session_id=session_id,
367 target_scope=req.scope,
368 write_mode=req.write_mode,
369 derived_from=req.derived_from,
370 )
371 conn.execute(
372 """INSERT INTO facts
373 (id, entity, relation, value_type, value_v, source, timestamp,
374 valid_until, confidence, scope, hlc, received_from, attested_key_id,
375 garden_id, attested, tenant_id, embedding_missing, cid, derived_from,
376 interpret_as)
377 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
378 (
379 fact_id,
380 entity, # normalized (spec §2.6)
381 req.relation,
382 req.value.type,
383 value_v,
384 source, # normalized (spec §2.6)
385 now,
386 req.valid_until,
387 req.confidence,
388 req.scope,
389 hlc,
390 None, # local write; not received from a peer
391 attested_key_id,
392 garden_uuid,
393 attested_int,
394 identity.tenant_id,
395 embedding_missing_val,
396 fact_cid,
397 derived_from_json,
398 req.value.interpret_as,
399 ),
400 )
401 write_fact_journal(
402 conn,
403 fact_id=fact_id,
404 event_type="fact_insert",
405 tenant_id=identity.tenant_id,
406 actor_uri=identity.entity_uri,
407 source=source,
408 scope=req.scope,
409 cid=fact_cid,
410 body={
411 "entity": entity,
412 "relation": req.relation,
413 "value_type": req.value.type,
414 "value_v": value_v,
415 "source": source,
416 "timestamp": now,
417 "valid_until": req.valid_until,
418 "confidence": req.confidence,
419 "scope": req.scope,
420 "interpret_as": req.value.interpret_as,
421 },
422 )
423 if embedding_missing_val is not None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 set_embedding_status(
425 conn,
426 fact_id=fact_id,
427 embedding_missing=bool(embedding_missing_val),
428 updated_by="fact_assert",
429 )
431 # F-10 §25.7.3: alias table row — idempotent upsert on CID collision
432 alias_result = conn.execute(
433 "INSERT OR IGNORE INTO fact_cid_aliases (fact_id, cid) VALUES (?, ?)",
434 (fact_id, fact_cid),
435 )
436 if alias_result.rowcount == 0: 436 ↛ 438line 436 didn't jump to line 438 because the condition on line 436 was never true
437 # Concurrent same-CID write race: return existing record
438 existing = conn.execute(
439 "SELECT f.* FROM facts f JOIN fact_cid_aliases a ON a.fact_id = f.id"
440 " WHERE a.cid = ? AND f.tenant_id = ?",
441 (fact_cid, identity.tenant_id),
442 ).fetchone()
443 if existing is not None:
444 return row_to_record(existing, contradicted=False)
446 record_write_scope(
447 conn,
448 identity=identity,
449 session_id=session_id,
450 scope=req.scope,
451 )
453 # C3 / §22.3: write-ahead audit entry for fact_write event (same transaction)
454 from ..audit_event import emit as _emit_audit
456 _emit_audit(
457 "fact_write",
458 entity_uri=identity.entity_uri,
459 tenant_id=identity.tenant_id,
460 oidc_sub=identity.oidc_sub,
461 fact_id=fact_id,
462 source=source,
463 attested_key_id=attested_key_id,
464 scope=req.scope,
465 conn=conn,
466 )
468 # Graph adjacency index (§20.1.1): materialize edge for ref-typed facts
469 if req.value.type == "ref" and value_v and _is_valid_entity_uri(value_v):
470 from ..recall.graph_index import upsert_edge as _upsert_edge
472 _upsert_edge(
473 conn,
474 fact_id=fact_id,
475 subject=entity,
476 relation=req.relation,
477 object_uri=value_v,
478 scope=req.scope,
479 confidence=req.confidence,
480 garden_id=garden_uuid,
481 tenant_id=identity.tenant_id,
482 received_from=None,
483 source_trust=None,
484 valid_until=req.valid_until,
485 )
487 row = conn.execute("SELECT * FROM facts WHERE id=?", (fact_id,)).fetchone()
488 from ..fact_chain import append_fact_chain_entry
490 append_fact_chain_entry(conn, row)
491 persisted_record = row_to_record(row, contradicted=False)
492 get_registry().fire_fire_and_forget(
493 "post_assert_persist",
494 fact=persisted_record,
495 identity=identity,
496 tenant=tenant,
497 request_id=request_id,
498 )
499 contradicted = _detect_and_record_contradictions(conn, fact_id, entity, req, identity)
501 _emit_post_write_hooks(
502 fact_id=fact_id,
503 entity=entity,
504 source=source,
505 req=req,
506 identity=identity,
507 value_v=value_v,
508 garden_uuid=garden_uuid,
509 now=now,
510 contradicted=contradicted,
511 _span=_span,
512 )
514 return row_to_record(row, contradicted=contradicted, warnings=relation_warnings)