Coverage for node / src / stigmem_node / card_materializer.py: 100%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Memory card materializer — spec §20 (Phase 9).
3A memory card is a per-entity, per-scope pre-aggregated summary that is
4refreshed on write. When recall finds a fresh card with high confidence it
5short-circuits raw-fact re-ranking for that entity, reducing per-query work.
7Public surface
8--------------
9mark_entity_stale(entity_uri, scope, tenant_id)
10 Called on every assert; marks the entity's card stale so the next recall
11 (or explicit GET /v1/cards) triggers a refresh.
13refresh_card(entity_uri, scope, tenant_id, conn) -> MemoryCardData | None
14 Regenerates the card from current facts using the configured summary model.
15 Returns None when the entity has no live facts.
17get_fresh_card(entity_uri, scope, tenant_id, conn) -> MemoryCardData | None
18 Returns a cached fresh card when available; refreshes if stale.
19 Returns None when the entity has no facts.
20"""
22from __future__ import annotations
24import hashlib
25import json
26import logging
27from datetime import UTC, datetime
28from typing import Any, NamedTuple
30logger = logging.getLogger("stigmem.cards")
32# Cards below this avg_confidence threshold do not short-circuit recall.
33CARD_MIN_CONFIDENCE: float = 0.5
35# Maximum facts folded into a single card summary.
36_SUMMARY_MAX_FACTS = 20
39class MemoryCardData(NamedTuple):
40 entity_uri: str
41 scope: str
42 tenant_id: str
43 summary: str
44 fact_hashes: list[str]
45 avg_confidence: float
46 refreshed_at: str
47 is_stale: bool
48 has_contradictions: bool
51def mark_entity_stale(entity_uri: str, scope: str, tenant_id: str) -> None:
52 """Open a short-lived DB connection and mark the entity's card stale.
54 No-op when no card row exists yet. Errors are logged and suppressed so
55 the caller's write path is never blocked by card bookkeeping.
56 """
57 try:
58 from .db import db
60 with db() as conn:
61 conn.execute(
62 "UPDATE memory_cards SET is_stale = 1"
63 " WHERE entity_uri = ? AND tenant_id = ? AND scope = ?",
64 (entity_uri, tenant_id, scope),
65 )
66 except Exception as exc:
67 logger.warning("mark_entity_stale failed for %r: %s", entity_uri, exc)
70def refresh_card(
71 entity_uri: str,
72 scope: str,
73 tenant_id: str,
74 conn: Any,
75) -> MemoryCardData | None:
76 """Recompute and persist the card for *entity_uri* from current live facts.
78 Returns the new MemoryCardData, or None when the entity has no live facts.
79 The card row is upserted atomically in *conn*.
80 """
81 now = datetime.now(UTC).isoformat()
83 rows = conn.execute(
84 """
85 SELECT id, relation, value_type, value_v, confidence, source_trust, timestamp
86 FROM facts
87 WHERE entity = ? AND scope = ? AND tenant_id = ?
88 AND confidence > 0
89 AND (valid_until IS NULL OR valid_until > ?)
90 AND (quarantine_status IS NULL OR quarantine_status != 'pending')
91 ORDER BY confidence DESC, timestamp DESC
92 LIMIT ?
93 """,
94 (entity_uri, scope, tenant_id, now, _SUMMARY_MAX_FACTS),
95 ).fetchall()
97 if not rows:
98 return None
100 # Contradiction: multiple facts with the same relation on this entity
101 relations_seen: set[str] = set()
102 has_contradictions = False
103 for row in rows:
104 rel = row["relation"]
105 if rel in relations_seen:
106 has_contradictions = True
107 break
108 relations_seen.add(rel)
110 # Source-trust-weighted average confidence
111 total_weight = 0.0
112 weighted_conf = 0.0
113 for row in rows:
114 st = row["source_trust"] if row["source_trust"] is not None else 1.0
115 weight = max(0.01, float(st))
116 weighted_conf += float(row["confidence"]) * weight
117 total_weight += weight
118 avg_confidence = round(weighted_conf / total_weight if total_weight > 0 else 0.0, 6)
120 # Contributing fact hashes: sha256 of fact id
121 fact_hashes = [hashlib.sha256(row["id"].encode()).hexdigest() for row in rows]
123 # Local summary: structured text representation
124 lines = [f"Entity: {entity_uri}", "Facts:"]
125 for row in rows:
126 vv = row["value_v"] or ""
127 lines.append(f" {row['relation']}: {vv} (conf={row['confidence']:.2f})")
128 summary = "\n".join(lines)
130 conn.execute(
131 """
132 INSERT INTO memory_cards
133 (entity_uri, tenant_id, scope, summary, fact_hashes, avg_confidence,
134 refreshed_at, is_stale, has_contradictions)
135 VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?)
136 ON CONFLICT (entity_uri, tenant_id, scope) DO UPDATE SET
137 summary = excluded.summary,
138 fact_hashes = excluded.fact_hashes,
139 avg_confidence = excluded.avg_confidence,
140 refreshed_at = excluded.refreshed_at,
141 is_stale = 0,
142 has_contradictions = excluded.has_contradictions
143 """,
144 (
145 entity_uri,
146 tenant_id,
147 scope,
148 summary,
149 json.dumps(fact_hashes),
150 avg_confidence,
151 now,
152 1 if has_contradictions else 0,
153 ),
154 )
156 return MemoryCardData(
157 entity_uri=entity_uri,
158 scope=scope,
159 tenant_id=tenant_id,
160 summary=summary,
161 fact_hashes=fact_hashes,
162 avg_confidence=avg_confidence,
163 refreshed_at=now,
164 is_stale=False,
165 has_contradictions=has_contradictions,
166 )
169def get_fresh_card(
170 entity_uri: str,
171 scope: str,
172 tenant_id: str,
173 conn: Any,
174) -> MemoryCardData | None:
175 """Return a fresh card for *entity_uri*, refreshing it when stale.
177 Returns None when the entity has no live facts.
178 """
179 row = conn.execute(
180 """
181 SELECT entity_uri, tenant_id, scope, summary, fact_hashes,
182 avg_confidence, refreshed_at, is_stale, has_contradictions
183 FROM memory_cards
184 WHERE entity_uri = ? AND tenant_id = ? AND scope = ?
185 """,
186 (entity_uri, tenant_id, scope),
187 ).fetchone()
189 if row is None or row["is_stale"]:
190 return refresh_card(entity_uri, scope, tenant_id, conn)
192 return MemoryCardData(
193 entity_uri=row["entity_uri"],
194 scope=row["scope"],
195 tenant_id=row["tenant_id"],
196 summary=row["summary"],
197 fact_hashes=json.loads(row["fact_hashes"] or "[]"),
198 avg_confidence=float(row["avg_confidence"]),
199 refreshed_at=row["refreshed_at"],
200 is_stale=False,
201 has_contradictions=bool(row["has_contradictions"]),
202 )