Coverage for node / src / stigmem_node / routes / recall / as_of.py: 87%
61 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Time-travel recall implementation."""
3from __future__ import annotations
5from typing import Any
7from ...auth import Identity
8from ...models.facts import row_to_record
9from ...models.recall import RecallWeights, ScoreBreakdown, ScoredFact
10from ...models.tombstones import TombstoneNotice
11from ...plugins import get_registry
12from ...recall.recall_pipeline import apply_recall_pipeline
13from ..cid_integrity import enforce_read_path_cid
14from .common import _estimate_tokens
15from .ranking import _greedy_pack
18def _recall_as_of_impl(
19 conn: Any,
20 *,
21 query: str,
22 scope: str,
23 as_of: str,
24 is_admin_caller: bool,
25 tenant_id: str,
26 max_chunks: int,
27 include_graph: bool,
28 identity: Identity,
29 weights: RecallWeights,
30 depth: int,
31) -> tuple[list[ScoredFact], list[TombstoneNotice], bool]:
32 """Return (scored_facts, tombstone_notices, tombstone_filtered) for a time-travel
33 recall at as_of (§24.4).
35 Fetches facts visible at as_of (existing, not expired, not retracted at T),
36 scores them, applies tombstone filter, and returns packed chunks.
37 """
38 from ..facts import _get_tombstone_filter
40 # Candidate fetch: facts visible at as_of
41 rows = conn.execute(
42 """
43 SELECT f.*,
44 COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until,
45 COALESCE(fvo.confidence, f.confidence) AS projected_confidence,
46 COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id,
47 COALESCE(fqs.quarantine_status, f.quarantine_status)
48 AS projected_quarantine_status,
49 COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id)
50 AS projected_quarantine_garden_id,
51 (
52 SELECT fca.cid
53 FROM fact_cid_aliases fca
54 WHERE fca.fact_id = f.id
55 ORDER BY fca.cid
56 LIMIT 1
57 ) AS projected_cid
58 FROM facts f
59 LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id
60 LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id
61 LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id
62 WHERE f.scope = ?
63 AND f.tenant_id = ?
64 AND f.timestamp <= ?
65 AND (
66 COALESCE(fvo.valid_until, f.valid_until) IS NULL
67 OR COALESCE(fvo.valid_until, f.valid_until) > ?
68 )
69 AND NOT EXISTS (
70 SELECT 1 FROM fact_retractions fr
71 WHERE fr.fact_id = f.id AND fr.retracted_at <= ?
72 )
73 ORDER BY f.timestamp DESC
74 LIMIT ?
75 """,
76 (scope, tenant_id, as_of, as_of, as_of, max_chunks * 5),
77 ).fetchall()
79 if not rows:
80 return [], [], False
82 for row in rows:
83 enforce_read_path_cid(row)
84 all_facts_raw = {row["id"]: row_to_record(row) for row in rows}
86 # Apply recall pipeline (trust, sanitizer)
87 pipeline_out = apply_recall_pipeline(list(all_facts_raw.values()), identity)
88 all_facts = {r.id: r for r in pipeline_out}
90 # Score using lexical signal against query (recency computed relative to as_of)
91 def _recency_as_of(ts_str: str, as_of_ts: str) -> float:
92 try:
93 from datetime import UTC, datetime
95 ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
96 aof = datetime.fromisoformat(as_of_ts.replace("Z", "+00:00"))
97 if ts.tzinfo is None: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 ts = ts.replace(tzinfo=UTC)
99 if aof.tzinfo is None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 aof = aof.replace(tzinfo=UTC)
101 days_old = (aof - ts).days
102 return max(0.0, 1.0 - days_old / 365.0)
103 except Exception:
104 return 0.5
106 scored: list[ScoredFact] = []
107 q_lower = query.lower()
108 total_weight = weights.lexical + weights.recency
109 if total_weight <= 0: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 total_weight = 1.0
112 for record in all_facts.values():
113 if record.quarantine_status == "pending": 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 continue
115 text = f"{record.entity} {record.relation} {record.value.v or ''}".lower()
116 lex = sum(1.0 for w in q_lower.split() if w and w in text) / max(1, len(q_lower.split()))
117 rec_s = _recency_as_of(record.timestamp, as_of)
118 trust_s = 0.0
119 raw = (weights.lexical * lex + weights.recency * rec_s) / total_weight
120 final_score = raw * max(0.0, record.confidence)
121 scored.append(
122 ScoredFact(
123 fact=record,
124 score=round(final_score, 6),
125 score_breakdown=ScoreBreakdown(
126 lexical=round(lex, 4),
127 recency=round(rec_s, 4),
128 source_trust=round(trust_s, 4),
129 weighted_total=round(final_score, 6),
130 ),
131 hop_distance=0,
132 token_estimate=_estimate_tokens(record),
133 )
134 )
136 source_deltas = get_registry().fire_score_delta(
137 "recall_rank",
138 scored,
139 identity=identity,
140 weights=weights,
141 as_of=as_of,
142 )
143 if source_deltas:
144 scored = [
145 sf.model_copy(
146 update={
147 "score": round(max(0.0, sf.score + source_deltas.get(sf.fact.id, 0.0)), 6),
148 "score_breakdown": sf.score_breakdown.model_copy(
149 update={
150 "weighted_total": round(
151 max(0.0, sf.score + source_deltas.get(sf.fact.id, 0.0)),
152 6,
153 )
154 }
155 ),
156 }
157 )
158 if source_deltas.get(sf.fact.id, 0.0) != 0.0
159 else sf
160 for sf in scored
161 ]
163 scored.sort(key=lambda c: c.score, reverse=True)
165 # Tombstone filter (§24.3)
166 entity_uris = list({sf.fact.entity for sf in scored})
167 excluded, notices = _get_tombstone_filter(conn, entity_uris, scope, is_admin_caller)
168 tombstone_filtered = False
169 if excluded:
170 scored = [sf for sf in scored if sf.fact.entity not in excluded]
171 tombstone_filtered = True
173 packed, _, _ = _greedy_pack(scored[:max_chunks], max_chunks * 20)
174 return packed[:max_chunks], notices, tombstone_filtered