Coverage for node / src / stigmem_node / routes / recall / lexical.py: 39%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Lexical recall search stage.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from typing import Any 

7 

8from .common import logger 

9 

10_FTS5_SPECIAL = re.compile(r'["()\^*\-:,]+') 

11 

12def _fts_query(q: str) -> str: 

13 """Sanitise a user query string for SQLite FTS5 MATCH.""" 

14 cleaned = _FTS5_SPECIAL.sub(" ", q).strip() 

15 return cleaned or '""' 

16 

17 

18def _like_search( 

19 conn: Any, 

20 query: str, 

21 scope: str, 

22 tenant_id: str, 

23 k: int, 

24 min_confidence: float, 

25 now: str, 

26) -> dict[str, float]: 

27 """LIKE-based text scan — fallback when FTS5 is unavailable (libsql, postgres).""" 

28 words = [w for w in re.sub(r"[^\w]", " ", query).split() if len(w) >= 2][:5] 

29 if not words: 

30 return {} 

31 clauses = " OR ".join("(value_v LIKE ? OR entity LIKE ?)" for _ in words) 

32 params: list[Any] = [] 

33 for w in words: 

34 pat = f"%{w}%" 

35 params.extend([pat, pat]) 

36 params.extend([scope, tenant_id, min_confidence, now, k]) 

37 sql = ( # noqa: S608 

38 "SELECT f.id AS fact_id, COALESCE(fvo.confidence, f.confidence) AS rank " # noqa: S608 

39 "FROM facts f LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id" # noqa: S608 

40 f" WHERE ({clauses})" # noqa: S608 # nosec B608 — clauses built from "?" 

41 " AND f.scope = ? AND f.tenant_id = ?" 

42 " AND COALESCE(fvo.confidence, f.confidence) >= ?" 

43 " AND (COALESCE(fvo.valid_until, f.valid_until) IS NULL" 

44 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)" 

45 " ORDER BY COALESCE(fvo.confidence, f.confidence) DESC LIMIT ?" # noqa: S608 

46 ) 

47 try: 

48 rows = conn.execute(sql, params).fetchall() 

49 except Exception as exc: 

50 logger.warning("LIKE fallback search failed: %s", exc) 

51 return {} 

52 if not rows: 

53 return {} 

54 max_conf = max(float(row["rank"]) for row in rows) 

55 if max_conf <= 0: 

56 return {} 

57 return {row["fact_id"]: float(row["rank"]) / max_conf for row in rows} 

58 

59 

60# --------------------------------------------------------------------------- 

61# Lexical search (FTS5 / BM25) 

62# --------------------------------------------------------------------------- 

63 

64 

65def _lexical_search( 

66 conn: Any, 

67 query: str, 

68 scope: str, 

69 tenant_id: str, 

70 k: int, 

71 min_confidence: float, 

72 now: str, 

73) -> dict[str, float]: 

74 """Return {fact_id: normalised_bm25_score}. Returns {} on FTS unavailability.""" 

75 fts_q = _fts_query(query) 

76 try: 

77 # SAVEPOINT protects the surrounding transaction on Postgres: a failed 

78 # FTS5 MATCH query aborts the psycopg2 transaction, making all subsequent 

79 # SQL on the same connection fail. Rolling back to the savepoint restores 

80 # the connection to a usable state. SAVEPOINT is a no-op cost on 

81 # SQLite/libsql, which also support the syntax. 

82 conn.execute("SAVEPOINT _fts_search") 

83 rows = conn.execute( 

84 """ 

85 SELECT ff.fact_id, bm25(facts_fts) AS rank 

86 FROM facts_fts ff 

87 JOIN facts f ON f.id = ff.fact_id 

88 LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id 

89 WHERE facts_fts MATCH ? 

90 AND f.scope = ? 

91 AND f.tenant_id = ? 

92 AND COALESCE(fvo.confidence, f.confidence) >= ? 

93 AND (COALESCE(fvo.valid_until, f.valid_until) IS NULL 

94 OR COALESCE(fvo.valid_until, f.valid_until) > ?) 

95 ORDER BY rank 

96 LIMIT ? 

97 """, 

98 (fts_q, scope, tenant_id, min_confidence, now, k), 

99 ).fetchall() 

100 conn.execute("RELEASE SAVEPOINT _fts_search") 

101 except Exception as exc: 

102 logger.warning("FTS5 lexical search failed: %s", exc) 

103 try: 

104 conn.execute("ROLLBACK TO SAVEPOINT _fts_search") 

105 except Exception as rollback_exc: 

106 logger.warning("FTS5 lexical search rollback failed: %s", rollback_exc) 

107 return _like_search(conn, query, scope, tenant_id, k, min_confidence, now) 

108 

109 if not rows: 

110 return {} 

111 

112 # bm25() returns negative values; less negative = more relevant. 

113 abs_scores = [(-row["rank"], row["fact_id"]) for row in rows] 

114 max_score = max(s for s, _ in abs_scores) 

115 if max_score <= 0: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 return {} 

117 return {fid: score / max_score for score, fid in abs_scores}