Coverage for node / src / stigmem_node / recall / entity_resolver.py: 88%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Fuzzy entity resolver — spec §2.6.6 (v0.8, Track F).
3Three-layer resolution pipeline:
5 Layer 1 — Canonical normalisation (entity_normalizer.py).
6 Deterministic, idempotent. Collapses case/whitespace/hyphen variants.
8 Layer 2 — Alias table lookup (entity_aliases, Migration 003).
9 Explicit pre-registered mappings for known legacy URIs.
11 Layer 3 — Token-fuzzy scoring over the live fact graph.
12 For informal URIs (type:id) with the same type prefix, scores
13 known entity candidates by token overlap and SequenceMatcher
14 similarity on the id segment. Returns ranked candidates.
15 Threshold: FUZZY_SCORE_THRESHOLD (default 0.5).
17The full Kompl-style resolver (phonetic matching, NLP-based entity linking)
18is deferred to Phase 7. Layer 3 here covers the common cases:
19 - Abbreviated names: user:alice ≡ user:a.smith (partial token match)
20 - Whitespace/hyphen: user:alice-smith ≡ user:alice_smith (Layer 1)
21 - Legacy aliases: user:alice → user:a.smith (Layer 2 explicit mapping)
22"""
24from __future__ import annotations
26import re
27import sqlite3
28from dataclasses import dataclass, field
29from difflib import SequenceMatcher
30from typing import Any
32from ..utility.entity_normalizer import NormalizationError, normalize_entity_uri
34_TOKEN_SPLIT_RE = re.compile(r"[.\-_/\s]+")
35_FORMAL_PREFIX = "stigmem://"
36FUZZY_SCORE_THRESHOLD = 0.5
39@dataclass
40class ResolveCandidate:
41 uri: str
42 score: float
43 layer: int # 1=canonical, 2=alias, 3=fuzzy
44 match_note: str = ""
47@dataclass
48class ResolveResult:
49 query: str
50 canonical: str | None = None # None if normalisation failed
51 layer1_match: bool = False
52 layer2_match: str | None = None # alias target if found
53 layer3_candidates: list[ResolveCandidate] = field(default_factory=list)
55 @property
56 def best(self) -> str | None:
57 """Return the highest-confidence resolved URI, or None."""
58 if self.layer2_match: 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true
59 return self.layer2_match
60 if self.canonical and self.layer1_match:
61 return self.canonical
62 if self.layer3_candidates:
63 return self.layer3_candidates[0].uri
64 return self.canonical
67def _tokenise(id_segment: str) -> list[str]:
68 """Split an id segment into lowercase tokens."""
69 return [t for t in _TOKEN_SPLIT_RE.split(id_segment.lower()) if t]
72def _type_prefix(uri: str) -> str | None:
73 """Extract 'type' from 'type:id' informal URIs. Returns None for formal or bare ids."""
74 if uri.startswith(_FORMAL_PREFIX) or ":" not in uri:
75 return None
76 return uri.split(":", 1)[0]
79def _id_segment(uri: str) -> str:
80 """Extract the id part (after the type: prefix) for informal URIs."""
81 if ":" in uri and not uri.startswith(_FORMAL_PREFIX):
82 return uri.split(":", 1)[1]
83 return uri
86def _token_score(a_tokens: list[str], b_tokens: list[str]) -> float:
87 """
88 Similarity score [0, 1] between two token lists.
90 Combines:
91 - Jaccard overlap on full tokens
92 - Initial/prefix match: token in a is a prefix of a token in b (covers a.smith ↔ alice)
93 - Best SequenceMatcher ratio across all pair combinations
94 """
95 if not a_tokens or not b_tokens:
96 return 0.0
98 a_set = set(a_tokens)
99 b_set = set(b_tokens)
101 # Jaccard on exact tokens
102 jaccard = len(a_set & b_set) / len(a_set | b_set)
104 # Prefix/initial match: score +0.3 if any token in a is a prefix of any token in b
105 prefix_bonus = 0.0
106 for ta in a_tokens:
107 for tb in b_tokens:
108 if len(ta) >= 1 and tb.startswith(ta[0]):
109 prefix_bonus = max(prefix_bonus, 0.3)
110 if len(ta) > 1 and tb.startswith(ta[:2]):
111 prefix_bonus = max(prefix_bonus, 0.5)
113 # SequenceMatcher on concatenated id strings
114 seq_score = SequenceMatcher(None, "".join(a_tokens), "".join(b_tokens)).ratio()
116 return min(1.0, max(jaccard, prefix_bonus, seq_score))
119def resolve_entity(
120 raw: str,
121 conn: sqlite3.Connection,
122 top_k: int = 5,
123 threshold: float = FUZZY_SCORE_THRESHOLD,
124) -> ResolveResult:
125 """Resolve raw entity URI through 3 layers.
127 Args:
128 raw: Raw entity URI string (may be non-canonical, aliased, or abbreviated).
129 conn: Open SQLite connection (read-only queries only).
130 top_k: Maximum Layer 3 candidates to return.
131 threshold: Minimum fuzzy score to include in Layer 3 candidates.
133 Returns:
134 ResolveResult with layer-by-layer findings and a `.best` shorthand.
135 """
136 result = ResolveResult(query=raw)
138 # -------------------------------------------------------------------
139 # Layer 1 — canonical normalisation
140 # -------------------------------------------------------------------
141 try:
142 canonical = normalize_entity_uri(raw)
143 except NormalizationError:
144 return result # malformed input; no resolution possible
146 result.canonical = canonical
148 # Check if the canonical form itself appears in the fact graph.
149 live_check = conn.execute(
150 "SELECT 1 FROM facts WHERE entity = ? AND confidence > 0.0 LIMIT 1",
151 (canonical,),
152 ).fetchone()
153 if live_check:
154 result.layer1_match = True
155 return result # exact hit; no further layers needed
157 # -------------------------------------------------------------------
158 # Layer 2 — alias table lookup
159 # -------------------------------------------------------------------
160 alias_row = conn.execute(
161 "SELECT canonical_uri FROM entity_aliases WHERE raw_uri = ?",
162 (canonical,),
163 ).fetchone()
164 if alias_row is None:
165 # Also try the raw input (pre-normalisation alias)
166 alias_row = conn.execute(
167 "SELECT canonical_uri FROM entity_aliases WHERE raw_uri = ?",
168 (raw,),
169 ).fetchone()
171 if alias_row:
172 result.layer2_match = str(alias_row["canonical_uri"])
173 return result # explicit alias; authoritative
175 # -------------------------------------------------------------------
176 # Layer 3 — token-fuzzy scoring over same-type entities
177 # -------------------------------------------------------------------
178 type_prefix = _type_prefix(canonical)
179 if type_prefix is None: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was never true
180 # Formal or bare URIs: skip Layer 3 (no type-scoped candidates).
181 return result
183 id_seg = _id_segment(canonical)
184 query_tokens = _tokenise(id_seg)
185 if not query_tokens: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 return result
188 # Fetch all distinct entity URIs with the same type prefix from the fact graph.
189 prefix_pattern = f"{type_prefix}:%"
190 candidate_rows: list[Any] = conn.execute(
191 """SELECT DISTINCT entity FROM facts
192 WHERE entity LIKE ? AND confidence > 0.0
193 LIMIT 2000""",
194 (prefix_pattern,),
195 ).fetchall()
197 scored: list[ResolveCandidate] = []
198 for row in candidate_rows:
199 candidate_uri: str = row["entity"]
200 if candidate_uri == canonical: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 continue # already checked in Layer 1
203 cand_id_seg = _id_segment(candidate_uri)
204 cand_tokens = _tokenise(cand_id_seg)
205 score = _token_score(query_tokens, cand_tokens)
207 if score >= threshold:
208 scored.append(
209 ResolveCandidate(
210 uri=candidate_uri,
211 score=score,
212 layer=3,
213 match_note=f"token_score={score:.2f} ({id_seg!r} ~ {cand_id_seg!r})",
214 )
215 )
217 scored.sort(key=lambda c: c.score, reverse=True)
218 result.layer3_candidates = scored[:top_k]
219 return result