Coverage for node / src / stigmem_node / routes / facts / common.py: 78%
84 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Shared helpers for fact route modules."""
3from __future__ import annotations
5import logging
6import uuid
7from datetime import UTC, datetime
8from typing import Any
10from fastapi import APIRouter
12from ...hlc import node_hlc
13from ...models.tombstones import TombstoneNotice
15logger = logging.getLogger("stigmem.facts")
17FACT_PROJECTION_SELECT = (
18 "f.*, "
19 "COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, "
20 "COALESCE(fvo.confidence, f.confidence) AS projected_confidence, "
21 "COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id, "
22 "COALESCE(fqs.quarantine_status, f.quarantine_status) AS projected_quarantine_status, "
23 "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) "
24 "AS projected_quarantine_garden_id, "
25 "COALESCE(f.cid, (SELECT fca.cid FROM fact_cid_aliases fca "
26 "WHERE fca.fact_id = f.id ORDER BY fca.cid LIMIT 1)) AS projected_cid"
27)
29FACT_PROJECTION_JOINS = (
30 " LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id"
31 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id"
32 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id"
33)
35__all__ = [
36 "FACT_PROJECTION_JOINS",
37 "FACT_PROJECTION_SELECT",
38 "router",
39]
42def _get_tombstone_filter(
43 conn: Any,
44 entity_uris: list[str],
45 scope: str,
46 is_admin_caller: bool,
47) -> tuple[set[str], list[TombstoneNotice]]:
48 """Return (excluded_entity_uris, tombstone_notices) for entity_uris in scope (§23.3, §24.3).
50 excluded_entity_uris: entities under active (non-legal-hold) tombstones.
51 tombstone_notices: annotations for legal_hold tombstones visible to admin callers.
52 """
53 from ...lifecycle.tombstone_gate import tombstone_filter_enabled
55 if not entity_uris or not tombstone_filter_enabled():
56 return set(), []
58 placeholders = ",".join("?" * len(entity_uris))
59 # BEGIN IMMEDIATE for SQLite consistency (§23.3.3 rule 5).
60 # On postgres this is a syntax error; rollback clears the failed txn state.
61 try:
62 conn.execute("BEGIN IMMEDIATE")
63 except Exception as exc: # nosec B110
64 logger.debug(
65 "BEGIN IMMEDIATE not supported or failed for tombstone filter transaction "
66 "(expected on some backends, e.g. postgres); continuing with default "
67 "transaction behavior: %s",
68 exc,
69 )
70 try: # noqa: SIM105
71 conn.rollback()
72 except Exception as rollback_exc: # nosec B110
73 logger.debug(
74 "Rollback after BEGIN IMMEDIATE failure also failed; continuing because "
75 "explicit BEGIN IMMEDIATE is optional in this path: %s",
76 rollback_exc,
77 )
78 rows = conn.execute(
79 f"""SELECT t.id, t.entity_uri, t.scope, t.created_at, t.legal_hold
80 FROM tombstones t
81 WHERE t.entity_uri IN ({placeholders})
82 AND NOT EXISTS (
83 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id
84 )""", # noqa: S608 # nosec B608 - dynamic SQL is generated placeholders only; entity values are bound params.
85 entity_uris,
86 ).fetchall()
87 try: # noqa: SIM105
88 conn.execute("COMMIT")
89 except Exception as exc: # nosec B110
90 logger.debug(
91 "Tombstone filter COMMIT skipped or failed (can occur when explicit "
92 "BEGIN IMMEDIATE was unavailable on this backend); continuing: %s",
93 exc,
94 )
96 excluded: set[str] = set()
97 notices: list[TombstoneNotice] = []
99 for row in rows:
100 uri = row["entity_uri"]
101 row_scope = row["scope"]
102 if row_scope != "*" and row_scope != scope: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 continue
105 if row["legal_hold"]:
106 if is_admin_caller:
107 notices.append(
108 TombstoneNotice(
109 entity_uri=uri,
110 tombstone_id=row["id"],
111 legal_hold=True,
112 tombstone_created_at=row["created_at"],
113 )
114 )
115 else:
116 excluded.add(uri)
117 else:
118 excluded.add(uri)
120 return excluded, notices
123router = APIRouter(prefix="/v1/facts", tags=["facts"])
125_SYSTEM_RELATION_PREFIX = "stigmem:"
128def _validate_relation(relation: str) -> list[str]:
129 """Return convention warnings for a relation name (see relation-convention.md)."""
130 if ":" not in relation:
131 return [
132 f"bare relation {relation!r} has no namespace prefix; "
133 f"rename to 'your-prefix:{relation}' to prevent silent collisions "
134 "(see relation-convention.md)"
135 ]
136 if relation.startswith(_SYSTEM_RELATION_PREFIX):
137 return [
138 f"relation {relation!r} uses reserved system prefix 'stigmem:'; "
139 "non-system callers should use a custom namespace prefix (see spec §9.1)"
140 ]
141 return []
144def _is_valid_entity_uri(uri: str) -> bool:
145 """Return whether a ref value is eligible for graph edge derivation."""
146 return "://" in uri or uri.startswith("urn:")
149def _embed_fact_background(
150 fact_id: str,
151 entity: str,
152 relation: str,
153 value_type: str,
154 value_v: str,
155) -> None:
156 """Background thread: embed one fact and persist to vec_facts."""
157 try:
158 from ... import settings as settings_pkg
159 from ...db import db
160 from ...embedding import get_embedding_model
161 from ...recall.vector_search import check_or_register_model, embed_and_store_fact
163 model = get_embedding_model(settings_pkg.settings)
164 with db() as conn:
165 check_or_register_model(conn, model.model_id, model.dimension)
166 embed_and_store_fact(fact_id, entity, relation, value_type, value_v, conn, model)
167 except Exception as exc:
168 logger.warning("Write-time embedding failed for fact %s: %s", fact_id, exc)
171def _record_contradictions(
172 conn: Any,
173 new_fact_id: str,
174 entity: str,
175 relation: str,
176 scope: str,
177 siblings: list[Any],
178 tenant_id: str = "default",
179) -> None:
180 """Write conflict entities and conflicts table rows for new contradictions."""
181 now = datetime.now(UTC).isoformat()
182 for sibling in siblings:
183 sibling_id = sibling["id"]
185 already = conn.execute(
186 """SELECT id FROM conflicts
187 WHERE (fact_a_id=? AND fact_b_id=?) OR (fact_a_id=? AND fact_b_id=?)""",
188 (new_fact_id, sibling_id, sibling_id, new_fact_id),
189 ).fetchone()
190 if already: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 continue
193 conflict_id = f"stigmem:conflict:{uuid.uuid4()}"
194 h_between = node_hlc.tick()
195 conn.execute(
196 """INSERT INTO facts
197 (id, entity, relation, value_type, value_v, source, timestamp,
198 valid_until, confidence, scope, hlc, received_from, tenant_id)
199 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
200 (
201 str(uuid.uuid4()),
202 conflict_id,
203 "stigmem:conflict:between",
204 "text",
205 f"{new_fact_id} {sibling_id}",
206 "system:stigmem",
207 now,
208 None,
209 1.0,
210 scope,
211 h_between,
212 None,
213 tenant_id,
214 ),
215 )
216 h_status = node_hlc.tick()
217 conn.execute(
218 """INSERT INTO facts
219 (id, entity, relation, value_type, value_v, source, timestamp,
220 valid_until, confidence, scope, hlc, received_from, tenant_id)
221 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
222 (
223 str(uuid.uuid4()),
224 conflict_id,
225 "stigmem:conflict:status",
226 "string",
227 "unresolved",
228 "system:stigmem",
229 now,
230 None,
231 1.0,
232 scope,
233 h_status,
234 None,
235 tenant_id,
236 ),
237 )
238 conn.execute(
239 """INSERT OR IGNORE INTO conflicts (id, fact_a_id, fact_b_id, status, detected_at)
240 VALUES (?,?,?,?,?)""",
241 (conflict_id, new_fact_id, sibling_id, "unresolved", now),
242 )
245def _encode_v(vtype: str, v: Any) -> str:
246 if vtype == "null":
247 return "null"
248 if vtype == "boolean":
249 return "true" if v else "false"
250 return str(v)