Coverage for node / src / stigmem_node / routes / recall / as_of.py: 87%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Time-travel recall implementation.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from ...auth import Identity 

8from ...models.facts import row_to_record 

9from ...models.recall import RecallWeights, ScoreBreakdown, ScoredFact 

10from ...models.tombstones import TombstoneNotice 

11from ...plugins import get_registry 

12from ...recall.recall_pipeline import apply_recall_pipeline 

13from ..cid_integrity import enforce_read_path_cid 

14from .common import _estimate_tokens 

15from .ranking import _greedy_pack 

16 

17 

18def _recall_as_of_impl( 

19 conn: Any, 

20 *, 

21 query: str, 

22 scope: str, 

23 as_of: str, 

24 is_admin_caller: bool, 

25 tenant_id: str, 

26 max_chunks: int, 

27 include_graph: bool, 

28 identity: Identity, 

29 weights: RecallWeights, 

30 depth: int, 

31) -> tuple[list[ScoredFact], list[TombstoneNotice], bool]: 

32 """Return (scored_facts, tombstone_notices, tombstone_filtered) for a time-travel 

33 recall at as_of (§24.4). 

34 

35 Fetches facts visible at as_of (existing, not expired, not retracted at T), 

36 scores them, applies tombstone filter, and returns packed chunks. 

37 """ 

38 from ..facts import _get_tombstone_filter 

39 

40 # Candidate fetch: facts visible at as_of 

41 rows = conn.execute( 

42 """ 

43 SELECT f.*, 

44 COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, 

45 COALESCE(fvo.confidence, f.confidence) AS projected_confidence, 

46 COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id, 

47 COALESCE(fqs.quarantine_status, f.quarantine_status) 

48 AS projected_quarantine_status, 

49 COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) 

50 AS projected_quarantine_garden_id, 

51 ( 

52 SELECT fca.cid 

53 FROM fact_cid_aliases fca 

54 WHERE fca.fact_id = f.id 

55 ORDER BY fca.cid 

56 LIMIT 1 

57 ) AS projected_cid 

58 FROM facts f 

59 LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id 

60 LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id 

61 LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id 

62 WHERE f.scope = ? 

63 AND f.tenant_id = ? 

64 AND f.timestamp <= ? 

65 AND ( 

66 COALESCE(fvo.valid_until, f.valid_until) IS NULL 

67 OR COALESCE(fvo.valid_until, f.valid_until) > ? 

68 ) 

69 AND NOT EXISTS ( 

70 SELECT 1 FROM fact_retractions fr 

71 WHERE fr.fact_id = f.id AND fr.retracted_at <= ? 

72 ) 

73 ORDER BY f.timestamp DESC 

74 LIMIT ? 

75 """, 

76 (scope, tenant_id, as_of, as_of, as_of, max_chunks * 5), 

77 ).fetchall() 

78 

79 if not rows: 

80 return [], [], False 

81 

82 for row in rows: 

83 enforce_read_path_cid(row) 

84 all_facts_raw = {row["id"]: row_to_record(row) for row in rows} 

85 

86 # Apply recall pipeline (trust, sanitizer) 

87 pipeline_out = apply_recall_pipeline(list(all_facts_raw.values()), identity) 

88 all_facts = {r.id: r for r in pipeline_out} 

89 

90 # Score using lexical signal against query (recency computed relative to as_of) 

91 def _recency_as_of(ts_str: str, as_of_ts: str) -> float: 

92 try: 

93 from datetime import UTC, datetime 

94 

95 ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) 

96 aof = datetime.fromisoformat(as_of_ts.replace("Z", "+00:00")) 

97 if ts.tzinfo is None: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 ts = ts.replace(tzinfo=UTC) 

99 if aof.tzinfo is None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 aof = aof.replace(tzinfo=UTC) 

101 days_old = (aof - ts).days 

102 return max(0.0, 1.0 - days_old / 365.0) 

103 except Exception: 

104 return 0.5 

105 

106 scored: list[ScoredFact] = [] 

107 q_lower = query.lower() 

108 total_weight = weights.lexical + weights.recency 

109 if total_weight <= 0: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 total_weight = 1.0 

111 

112 for record in all_facts.values(): 

113 if record.quarantine_status == "pending": 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 continue 

115 text = f"{record.entity} {record.relation} {record.value.v or ''}".lower() 

116 lex = sum(1.0 for w in q_lower.split() if w and w in text) / max(1, len(q_lower.split())) 

117 rec_s = _recency_as_of(record.timestamp, as_of) 

118 trust_s = 0.0 

119 raw = (weights.lexical * lex + weights.recency * rec_s) / total_weight 

120 final_score = raw * max(0.0, record.confidence) 

121 scored.append( 

122 ScoredFact( 

123 fact=record, 

124 score=round(final_score, 6), 

125 score_breakdown=ScoreBreakdown( 

126 lexical=round(lex, 4), 

127 recency=round(rec_s, 4), 

128 source_trust=round(trust_s, 4), 

129 weighted_total=round(final_score, 6), 

130 ), 

131 hop_distance=0, 

132 token_estimate=_estimate_tokens(record), 

133 ) 

134 ) 

135 

136 source_deltas = get_registry().fire_score_delta( 

137 "recall_rank", 

138 scored, 

139 identity=identity, 

140 weights=weights, 

141 as_of=as_of, 

142 ) 

143 if source_deltas: 

144 scored = [ 

145 sf.model_copy( 

146 update={ 

147 "score": round(max(0.0, sf.score + source_deltas.get(sf.fact.id, 0.0)), 6), 

148 "score_breakdown": sf.score_breakdown.model_copy( 

149 update={ 

150 "weighted_total": round( 

151 max(0.0, sf.score + source_deltas.get(sf.fact.id, 0.0)), 

152 6, 

153 ) 

154 } 

155 ), 

156 } 

157 ) 

158 if source_deltas.get(sf.fact.id, 0.0) != 0.0 

159 else sf 

160 for sf in scored 

161 ] 

162 

163 scored.sort(key=lambda c: c.score, reverse=True) 

164 

165 # Tombstone filter (§24.3) 

166 entity_uris = list({sf.fact.entity for sf in scored}) 

167 excluded, notices = _get_tombstone_filter(conn, entity_uris, scope, is_admin_caller) 

168 tombstone_filtered = False 

169 if excluded: 

170 scored = [sf for sf in scored if sf.fact.entity not in excluded] 

171 tombstone_filtered = True 

172 

173 packed, _, _ = _greedy_pack(scored[:max_chunks], max_chunks * 20) 

174 return packed[:max_chunks], notices, tombstone_filtered