Coverage for node / src / stigmem_node / routes / facts / common.py: 78%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Shared helpers for fact route modules.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import uuid 

7from datetime import UTC, datetime 

8from typing import Any 

9 

10from fastapi import APIRouter 

11 

12from ...hlc import node_hlc 

13from ...models.tombstones import TombstoneNotice 

14 

15logger = logging.getLogger("stigmem.facts") 

16 

17FACT_PROJECTION_SELECT = ( 

18 "f.*, " 

19 "COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, " 

20 "COALESCE(fvo.confidence, f.confidence) AS projected_confidence, " 

21 "COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id, " 

22 "COALESCE(fqs.quarantine_status, f.quarantine_status) AS projected_quarantine_status, " 

23 "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) " 

24 "AS projected_quarantine_garden_id, " 

25 "COALESCE(f.cid, (SELECT fca.cid FROM fact_cid_aliases fca " 

26 "WHERE fca.fact_id = f.id ORDER BY fca.cid LIMIT 1)) AS projected_cid" 

27) 

28 

29FACT_PROJECTION_JOINS = ( 

30 " LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id" 

31 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id" 

32 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id" 

33) 

34 

35__all__ = [ 

36 "FACT_PROJECTION_JOINS", 

37 "FACT_PROJECTION_SELECT", 

38 "router", 

39] 

40 

41 

42def _get_tombstone_filter( 

43 conn: Any, 

44 entity_uris: list[str], 

45 scope: str, 

46 is_admin_caller: bool, 

47) -> tuple[set[str], list[TombstoneNotice]]: 

48 """Return (excluded_entity_uris, tombstone_notices) for entity_uris in scope (§23.3, §24.3). 

49 

50 excluded_entity_uris: entities under active (non-legal-hold) tombstones. 

51 tombstone_notices: annotations for legal_hold tombstones visible to admin callers. 

52 """ 

53 from ...lifecycle.tombstone_gate import tombstone_filter_enabled 

54 

55 if not entity_uris or not tombstone_filter_enabled(): 

56 return set(), [] 

57 

58 placeholders = ",".join("?" * len(entity_uris)) 

59 # BEGIN IMMEDIATE for SQLite consistency (§23.3.3 rule 5). 

60 # On postgres this is a syntax error; rollback clears the failed txn state. 

61 try: 

62 conn.execute("BEGIN IMMEDIATE") 

63 except Exception as exc: # nosec B110 

64 logger.debug( 

65 "BEGIN IMMEDIATE not supported or failed for tombstone filter transaction " 

66 "(expected on some backends, e.g. postgres); continuing with default " 

67 "transaction behavior: %s", 

68 exc, 

69 ) 

70 try: # noqa: SIM105 

71 conn.rollback() 

72 except Exception as rollback_exc: # nosec B110 

73 logger.debug( 

74 "Rollback after BEGIN IMMEDIATE failure also failed; continuing because " 

75 "explicit BEGIN IMMEDIATE is optional in this path: %s", 

76 rollback_exc, 

77 ) 

78 rows = conn.execute( 

79 f"""SELECT t.id, t.entity_uri, t.scope, t.created_at, t.legal_hold 

80 FROM tombstones t 

81 WHERE t.entity_uri IN ({placeholders}) 

82 AND NOT EXISTS ( 

83 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id 

84 )""", # noqa: S608 # nosec B608 - dynamic SQL is generated placeholders only; entity values are bound params. 

85 entity_uris, 

86 ).fetchall() 

87 try: # noqa: SIM105 

88 conn.execute("COMMIT") 

89 except Exception as exc: # nosec B110 

90 logger.debug( 

91 "Tombstone filter COMMIT skipped or failed (can occur when explicit " 

92 "BEGIN IMMEDIATE was unavailable on this backend); continuing: %s", 

93 exc, 

94 ) 

95 

96 excluded: set[str] = set() 

97 notices: list[TombstoneNotice] = [] 

98 

99 for row in rows: 

100 uri = row["entity_uri"] 

101 row_scope = row["scope"] 

102 if row_scope != "*" and row_scope != scope: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 continue 

104 

105 if row["legal_hold"]: 

106 if is_admin_caller: 

107 notices.append( 

108 TombstoneNotice( 

109 entity_uri=uri, 

110 tombstone_id=row["id"], 

111 legal_hold=True, 

112 tombstone_created_at=row["created_at"], 

113 ) 

114 ) 

115 else: 

116 excluded.add(uri) 

117 else: 

118 excluded.add(uri) 

119 

120 return excluded, notices 

121 

122 

123router = APIRouter(prefix="/v1/facts", tags=["facts"]) 

124 

125_SYSTEM_RELATION_PREFIX = "stigmem:" 

126 

127 

128def _validate_relation(relation: str) -> list[str]: 

129 """Return convention warnings for a relation name (see relation-convention.md).""" 

130 if ":" not in relation: 

131 return [ 

132 f"bare relation {relation!r} has no namespace prefix; " 

133 f"rename to 'your-prefix:{relation}' to prevent silent collisions " 

134 "(see relation-convention.md)" 

135 ] 

136 if relation.startswith(_SYSTEM_RELATION_PREFIX): 

137 return [ 

138 f"relation {relation!r} uses reserved system prefix 'stigmem:'; " 

139 "non-system callers should use a custom namespace prefix (see spec §9.1)" 

140 ] 

141 return [] 

142 

143 

144def _is_valid_entity_uri(uri: str) -> bool: 

145 """Return whether a ref value is eligible for graph edge derivation.""" 

146 return "://" in uri or uri.startswith("urn:") 

147 

148 

149def _embed_fact_background( 

150 fact_id: str, 

151 entity: str, 

152 relation: str, 

153 value_type: str, 

154 value_v: str, 

155) -> None: 

156 """Background thread: embed one fact and persist to vec_facts.""" 

157 try: 

158 from ... import settings as settings_pkg 

159 from ...db import db 

160 from ...embedding import get_embedding_model 

161 from ...recall.vector_search import check_or_register_model, embed_and_store_fact 

162 

163 model = get_embedding_model(settings_pkg.settings) 

164 with db() as conn: 

165 check_or_register_model(conn, model.model_id, model.dimension) 

166 embed_and_store_fact(fact_id, entity, relation, value_type, value_v, conn, model) 

167 except Exception as exc: 

168 logger.warning("Write-time embedding failed for fact %s: %s", fact_id, exc) 

169 

170 

171def _record_contradictions( 

172 conn: Any, 

173 new_fact_id: str, 

174 entity: str, 

175 relation: str, 

176 scope: str, 

177 siblings: list[Any], 

178 tenant_id: str = "default", 

179) -> None: 

180 """Write conflict entities and conflicts table rows for new contradictions.""" 

181 now = datetime.now(UTC).isoformat() 

182 for sibling in siblings: 

183 sibling_id = sibling["id"] 

184 

185 already = conn.execute( 

186 """SELECT id FROM conflicts 

187 WHERE (fact_a_id=? AND fact_b_id=?) OR (fact_a_id=? AND fact_b_id=?)""", 

188 (new_fact_id, sibling_id, sibling_id, new_fact_id), 

189 ).fetchone() 

190 if already: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 continue 

192 

193 conflict_id = f"stigmem:conflict:{uuid.uuid4()}" 

194 h_between = node_hlc.tick() 

195 conn.execute( 

196 """INSERT INTO facts 

197 (id, entity, relation, value_type, value_v, source, timestamp, 

198 valid_until, confidence, scope, hlc, received_from, tenant_id) 

199 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", 

200 ( 

201 str(uuid.uuid4()), 

202 conflict_id, 

203 "stigmem:conflict:between", 

204 "text", 

205 f"{new_fact_id} {sibling_id}", 

206 "system:stigmem", 

207 now, 

208 None, 

209 1.0, 

210 scope, 

211 h_between, 

212 None, 

213 tenant_id, 

214 ), 

215 ) 

216 h_status = node_hlc.tick() 

217 conn.execute( 

218 """INSERT INTO facts 

219 (id, entity, relation, value_type, value_v, source, timestamp, 

220 valid_until, confidence, scope, hlc, received_from, tenant_id) 

221 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", 

222 ( 

223 str(uuid.uuid4()), 

224 conflict_id, 

225 "stigmem:conflict:status", 

226 "string", 

227 "unresolved", 

228 "system:stigmem", 

229 now, 

230 None, 

231 1.0, 

232 scope, 

233 h_status, 

234 None, 

235 tenant_id, 

236 ), 

237 ) 

238 conn.execute( 

239 """INSERT OR IGNORE INTO conflicts (id, fact_a_id, fact_b_id, status, detected_at) 

240 VALUES (?,?,?,?,?)""", 

241 (conflict_id, new_fact_id, sibling_id, "unresolved", now), 

242 ) 

243 

244 

245def _encode_v(vtype: str, v: Any) -> str: 

246 if vtype == "null": 

247 return "null" 

248 if vtype == "boolean": 

249 return "true" if v else "false" 

250 return str(v)