Coverage for node / src / stigmem_node / routes / recall / lexical.py: 39%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Lexical recall search stage."""
3from __future__ import annotations
5import re
6from typing import Any
8from .common import logger
10_FTS5_SPECIAL = re.compile(r'["()\^*\-:,]+')
12def _fts_query(q: str) -> str:
13 """Sanitise a user query string for SQLite FTS5 MATCH."""
14 cleaned = _FTS5_SPECIAL.sub(" ", q).strip()
15 return cleaned or '""'
18def _like_search(
19 conn: Any,
20 query: str,
21 scope: str,
22 tenant_id: str,
23 k: int,
24 min_confidence: float,
25 now: str,
26) -> dict[str, float]:
27 """LIKE-based text scan — fallback when FTS5 is unavailable (libsql, postgres)."""
28 words = [w for w in re.sub(r"[^\w]", " ", query).split() if len(w) >= 2][:5]
29 if not words:
30 return {}
31 clauses = " OR ".join("(value_v LIKE ? OR entity LIKE ?)" for _ in words)
32 params: list[Any] = []
33 for w in words:
34 pat = f"%{w}%"
35 params.extend([pat, pat])
36 params.extend([scope, tenant_id, min_confidence, now, k])
37 sql = ( # noqa: S608
38 "SELECT f.id AS fact_id, COALESCE(fvo.confidence, f.confidence) AS rank " # noqa: S608
39 "FROM facts f LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id" # noqa: S608
40 f" WHERE ({clauses})" # noqa: S608 # nosec B608 — clauses built from "?"
41 " AND f.scope = ? AND f.tenant_id = ?"
42 " AND COALESCE(fvo.confidence, f.confidence) >= ?"
43 " AND (COALESCE(fvo.valid_until, f.valid_until) IS NULL"
44 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)"
45 " ORDER BY COALESCE(fvo.confidence, f.confidence) DESC LIMIT ?" # noqa: S608
46 )
47 try:
48 rows = conn.execute(sql, params).fetchall()
49 except Exception as exc:
50 logger.warning("LIKE fallback search failed: %s", exc)
51 return {}
52 if not rows:
53 return {}
54 max_conf = max(float(row["rank"]) for row in rows)
55 if max_conf <= 0:
56 return {}
57 return {row["fact_id"]: float(row["rank"]) / max_conf for row in rows}
60# ---------------------------------------------------------------------------
61# Lexical search (FTS5 / BM25)
62# ---------------------------------------------------------------------------
65def _lexical_search(
66 conn: Any,
67 query: str,
68 scope: str,
69 tenant_id: str,
70 k: int,
71 min_confidence: float,
72 now: str,
73) -> dict[str, float]:
74 """Return {fact_id: normalised_bm25_score}. Returns {} on FTS unavailability."""
75 fts_q = _fts_query(query)
76 try:
77 # SAVEPOINT protects the surrounding transaction on Postgres: a failed
78 # FTS5 MATCH query aborts the psycopg2 transaction, making all subsequent
79 # SQL on the same connection fail. Rolling back to the savepoint restores
80 # the connection to a usable state. SAVEPOINT is a no-op cost on
81 # SQLite/libsql, which also support the syntax.
82 conn.execute("SAVEPOINT _fts_search")
83 rows = conn.execute(
84 """
85 SELECT ff.fact_id, bm25(facts_fts) AS rank
86 FROM facts_fts ff
87 JOIN facts f ON f.id = ff.fact_id
88 LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id
89 WHERE facts_fts MATCH ?
90 AND f.scope = ?
91 AND f.tenant_id = ?
92 AND COALESCE(fvo.confidence, f.confidence) >= ?
93 AND (COALESCE(fvo.valid_until, f.valid_until) IS NULL
94 OR COALESCE(fvo.valid_until, f.valid_until) > ?)
95 ORDER BY rank
96 LIMIT ?
97 """,
98 (fts_q, scope, tenant_id, min_confidence, now, k),
99 ).fetchall()
100 conn.execute("RELEASE SAVEPOINT _fts_search")
101 except Exception as exc:
102 logger.warning("FTS5 lexical search failed: %s", exc)
103 try:
104 conn.execute("ROLLBACK TO SAVEPOINT _fts_search")
105 except Exception as rollback_exc:
106 logger.warning("FTS5 lexical search rollback failed: %s", rollback_exc)
107 return _like_search(conn, query, scope, tenant_id, k, min_confidence, now)
109 if not rows:
110 return {}
112 # bm25() returns negative values; less negative = more relevant.
113 abs_scores = [(-row["rank"], row["fact_id"]) for row in rows]
114 max_score = max(s for s, _ in abs_scores)
115 if max_score <= 0: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 return {}
117 return {fid: score / max_score for score, fid in abs_scores}