Coverage for node / src / stigmem_node / card_materializer.py: 100%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Memory card materializer — spec §20 (Phase 9). 

2 

3A memory card is a per-entity, per-scope pre-aggregated summary that is 

4refreshed on write. When recall finds a fresh card with high confidence it 

5short-circuits raw-fact re-ranking for that entity, reducing per-query work. 

6 

7Public surface 

8-------------- 

9mark_entity_stale(entity_uri, scope, tenant_id) 

10 Called on every assert; marks the entity's card stale so the next recall 

11 (or explicit GET /v1/cards) triggers a refresh. 

12 

13refresh_card(entity_uri, scope, tenant_id, conn) -> MemoryCardData | None 

14 Regenerates the card from current facts using the configured summary model. 

15 Returns None when the entity has no live facts. 

16 

17get_fresh_card(entity_uri, scope, tenant_id, conn) -> MemoryCardData | None 

18 Returns a cached fresh card when available; refreshes if stale. 

19 Returns None when the entity has no facts. 

20""" 

21 

22from __future__ import annotations 

23 

24import hashlib 

25import json 

26import logging 

27from datetime import UTC, datetime 

28from typing import Any, NamedTuple 

29 

30logger = logging.getLogger("stigmem.cards") 

31 

32# Cards below this avg_confidence threshold do not short-circuit recall. 

33CARD_MIN_CONFIDENCE: float = 0.5 

34 

35# Maximum facts folded into a single card summary. 

36_SUMMARY_MAX_FACTS = 20 

37 

38 

39class MemoryCardData(NamedTuple): 

40 entity_uri: str 

41 scope: str 

42 tenant_id: str 

43 summary: str 

44 fact_hashes: list[str] 

45 avg_confidence: float 

46 refreshed_at: str 

47 is_stale: bool 

48 has_contradictions: bool 

49 

50 

51def mark_entity_stale(entity_uri: str, scope: str, tenant_id: str) -> None: 

52 """Open a short-lived DB connection and mark the entity's card stale. 

53 

54 No-op when no card row exists yet. Errors are logged and suppressed so 

55 the caller's write path is never blocked by card bookkeeping. 

56 """ 

57 try: 

58 from .db import db 

59 

60 with db() as conn: 

61 conn.execute( 

62 "UPDATE memory_cards SET is_stale = 1" 

63 " WHERE entity_uri = ? AND tenant_id = ? AND scope = ?", 

64 (entity_uri, tenant_id, scope), 

65 ) 

66 except Exception as exc: 

67 logger.warning("mark_entity_stale failed for %r: %s", entity_uri, exc) 

68 

69 

70def refresh_card( 

71 entity_uri: str, 

72 scope: str, 

73 tenant_id: str, 

74 conn: Any, 

75) -> MemoryCardData | None: 

76 """Recompute and persist the card for *entity_uri* from current live facts. 

77 

78 Returns the new MemoryCardData, or None when the entity has no live facts. 

79 The card row is upserted atomically in *conn*. 

80 """ 

81 now = datetime.now(UTC).isoformat() 

82 

83 rows = conn.execute( 

84 """ 

85 SELECT id, relation, value_type, value_v, confidence, source_trust, timestamp 

86 FROM facts 

87 WHERE entity = ? AND scope = ? AND tenant_id = ? 

88 AND confidence > 0 

89 AND (valid_until IS NULL OR valid_until > ?) 

90 AND (quarantine_status IS NULL OR quarantine_status != 'pending') 

91 ORDER BY confidence DESC, timestamp DESC 

92 LIMIT ? 

93 """, 

94 (entity_uri, scope, tenant_id, now, _SUMMARY_MAX_FACTS), 

95 ).fetchall() 

96 

97 if not rows: 

98 return None 

99 

100 # Contradiction: multiple facts with the same relation on this entity 

101 relations_seen: set[str] = set() 

102 has_contradictions = False 

103 for row in rows: 

104 rel = row["relation"] 

105 if rel in relations_seen: 

106 has_contradictions = True 

107 break 

108 relations_seen.add(rel) 

109 

110 # Source-trust-weighted average confidence 

111 total_weight = 0.0 

112 weighted_conf = 0.0 

113 for row in rows: 

114 st = row["source_trust"] if row["source_trust"] is not None else 1.0 

115 weight = max(0.01, float(st)) 

116 weighted_conf += float(row["confidence"]) * weight 

117 total_weight += weight 

118 avg_confidence = round(weighted_conf / total_weight if total_weight > 0 else 0.0, 6) 

119 

120 # Contributing fact hashes: sha256 of fact id 

121 fact_hashes = [hashlib.sha256(row["id"].encode()).hexdigest() for row in rows] 

122 

123 # Local summary: structured text representation 

124 lines = [f"Entity: {entity_uri}", "Facts:"] 

125 for row in rows: 

126 vv = row["value_v"] or "" 

127 lines.append(f" {row['relation']}: {vv} (conf={row['confidence']:.2f})") 

128 summary = "\n".join(lines) 

129 

130 conn.execute( 

131 """ 

132 INSERT INTO memory_cards 

133 (entity_uri, tenant_id, scope, summary, fact_hashes, avg_confidence, 

134 refreshed_at, is_stale, has_contradictions) 

135 VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?) 

136 ON CONFLICT (entity_uri, tenant_id, scope) DO UPDATE SET 

137 summary = excluded.summary, 

138 fact_hashes = excluded.fact_hashes, 

139 avg_confidence = excluded.avg_confidence, 

140 refreshed_at = excluded.refreshed_at, 

141 is_stale = 0, 

142 has_contradictions = excluded.has_contradictions 

143 """, 

144 ( 

145 entity_uri, 

146 tenant_id, 

147 scope, 

148 summary, 

149 json.dumps(fact_hashes), 

150 avg_confidence, 

151 now, 

152 1 if has_contradictions else 0, 

153 ), 

154 ) 

155 

156 return MemoryCardData( 

157 entity_uri=entity_uri, 

158 scope=scope, 

159 tenant_id=tenant_id, 

160 summary=summary, 

161 fact_hashes=fact_hashes, 

162 avg_confidence=avg_confidence, 

163 refreshed_at=now, 

164 is_stale=False, 

165 has_contradictions=has_contradictions, 

166 ) 

167 

168 

169def get_fresh_card( 

170 entity_uri: str, 

171 scope: str, 

172 tenant_id: str, 

173 conn: Any, 

174) -> MemoryCardData | None: 

175 """Return a fresh card for *entity_uri*, refreshing it when stale. 

176 

177 Returns None when the entity has no live facts. 

178 """ 

179 row = conn.execute( 

180 """ 

181 SELECT entity_uri, tenant_id, scope, summary, fact_hashes, 

182 avg_confidence, refreshed_at, is_stale, has_contradictions 

183 FROM memory_cards 

184 WHERE entity_uri = ? AND tenant_id = ? AND scope = ? 

185 """, 

186 (entity_uri, tenant_id, scope), 

187 ).fetchone() 

188 

189 if row is None or row["is_stale"]: 

190 return refresh_card(entity_uri, scope, tenant_id, conn) 

191 

192 return MemoryCardData( 

193 entity_uri=row["entity_uri"], 

194 scope=row["scope"], 

195 tenant_id=row["tenant_id"], 

196 summary=row["summary"], 

197 fact_hashes=json.loads(row["fact_hashes"] or "[]"), 

198 avg_confidence=float(row["avg_confidence"]), 

199 refreshed_at=row["refreshed_at"], 

200 is_stale=False, 

201 has_contradictions=bool(row["has_contradictions"]), 

202 )