Coverage for node / src / stigmem_node / recall / entity_resolver.py: 88%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Fuzzy entity resolver — spec §2.6.6 (v0.8, Track F). 

2 

3Three-layer resolution pipeline: 

4 

5 Layer 1 — Canonical normalisation (entity_normalizer.py). 

6 Deterministic, idempotent. Collapses case/whitespace/hyphen variants. 

7 

8 Layer 2 — Alias table lookup (entity_aliases, Migration 003). 

9 Explicit pre-registered mappings for known legacy URIs. 

10 

11 Layer 3 — Token-fuzzy scoring over the live fact graph. 

12 For informal URIs (type:id) with the same type prefix, scores 

13 known entity candidates by token overlap and SequenceMatcher 

14 similarity on the id segment. Returns ranked candidates. 

15 Threshold: FUZZY_SCORE_THRESHOLD (default 0.5). 

16 

17The full Kompl-style resolver (phonetic matching, NLP-based entity linking) 

18is deferred to Phase 7. Layer 3 here covers the common cases: 

19 - Abbreviated names: user:alice ≡ user:a.smith (partial token match) 

20 - Whitespace/hyphen: user:alice-smith ≡ user:alice_smith (Layer 1) 

21 - Legacy aliases: user:alice → user:a.smith (Layer 2 explicit mapping) 

22""" 

23 

24from __future__ import annotations 

25 

26import re 

27import sqlite3 

28from dataclasses import dataclass, field 

29from difflib import SequenceMatcher 

30from typing import Any 

31 

32from ..utility.entity_normalizer import NormalizationError, normalize_entity_uri 

33 

34_TOKEN_SPLIT_RE = re.compile(r"[.\-_/\s]+") 

35_FORMAL_PREFIX = "stigmem://" 

36FUZZY_SCORE_THRESHOLD = 0.5 

37 

38 

39@dataclass 

40class ResolveCandidate: 

41 uri: str 

42 score: float 

43 layer: int # 1=canonical, 2=alias, 3=fuzzy 

44 match_note: str = "" 

45 

46 

47@dataclass 

48class ResolveResult: 

49 query: str 

50 canonical: str | None = None # None if normalisation failed 

51 layer1_match: bool = False 

52 layer2_match: str | None = None # alias target if found 

53 layer3_candidates: list[ResolveCandidate] = field(default_factory=list) 

54 

55 @property 

56 def best(self) -> str | None: 

57 """Return the highest-confidence resolved URI, or None.""" 

58 if self.layer2_match: 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true

59 return self.layer2_match 

60 if self.canonical and self.layer1_match: 

61 return self.canonical 

62 if self.layer3_candidates: 

63 return self.layer3_candidates[0].uri 

64 return self.canonical 

65 

66 

67def _tokenise(id_segment: str) -> list[str]: 

68 """Split an id segment into lowercase tokens.""" 

69 return [t for t in _TOKEN_SPLIT_RE.split(id_segment.lower()) if t] 

70 

71 

72def _type_prefix(uri: str) -> str | None: 

73 """Extract 'type' from 'type:id' informal URIs. Returns None for formal or bare ids.""" 

74 if uri.startswith(_FORMAL_PREFIX) or ":" not in uri: 

75 return None 

76 return uri.split(":", 1)[0] 

77 

78 

79def _id_segment(uri: str) -> str: 

80 """Extract the id part (after the type: prefix) for informal URIs.""" 

81 if ":" in uri and not uri.startswith(_FORMAL_PREFIX): 

82 return uri.split(":", 1)[1] 

83 return uri 

84 

85 

86def _token_score(a_tokens: list[str], b_tokens: list[str]) -> float: 

87 """ 

88 Similarity score [0, 1] between two token lists. 

89 

90 Combines: 

91 - Jaccard overlap on full tokens 

92 - Initial/prefix match: token in a is a prefix of a token in b (covers a.smith ↔ alice) 

93 - Best SequenceMatcher ratio across all pair combinations 

94 """ 

95 if not a_tokens or not b_tokens: 

96 return 0.0 

97 

98 a_set = set(a_tokens) 

99 b_set = set(b_tokens) 

100 

101 # Jaccard on exact tokens 

102 jaccard = len(a_set & b_set) / len(a_set | b_set) 

103 

104 # Prefix/initial match: score +0.3 if any token in a is a prefix of any token in b 

105 prefix_bonus = 0.0 

106 for ta in a_tokens: 

107 for tb in b_tokens: 

108 if len(ta) >= 1 and tb.startswith(ta[0]): 

109 prefix_bonus = max(prefix_bonus, 0.3) 

110 if len(ta) > 1 and tb.startswith(ta[:2]): 

111 prefix_bonus = max(prefix_bonus, 0.5) 

112 

113 # SequenceMatcher on concatenated id strings 

114 seq_score = SequenceMatcher(None, "".join(a_tokens), "".join(b_tokens)).ratio() 

115 

116 return min(1.0, max(jaccard, prefix_bonus, seq_score)) 

117 

118 

119def resolve_entity( 

120 raw: str, 

121 conn: sqlite3.Connection, 

122 top_k: int = 5, 

123 threshold: float = FUZZY_SCORE_THRESHOLD, 

124) -> ResolveResult: 

125 """Resolve raw entity URI through 3 layers. 

126 

127 Args: 

128 raw: Raw entity URI string (may be non-canonical, aliased, or abbreviated). 

129 conn: Open SQLite connection (read-only queries only). 

130 top_k: Maximum Layer 3 candidates to return. 

131 threshold: Minimum fuzzy score to include in Layer 3 candidates. 

132 

133 Returns: 

134 ResolveResult with layer-by-layer findings and a `.best` shorthand. 

135 """ 

136 result = ResolveResult(query=raw) 

137 

138 # ------------------------------------------------------------------- 

139 # Layer 1 — canonical normalisation 

140 # ------------------------------------------------------------------- 

141 try: 

142 canonical = normalize_entity_uri(raw) 

143 except NormalizationError: 

144 return result # malformed input; no resolution possible 

145 

146 result.canonical = canonical 

147 

148 # Check if the canonical form itself appears in the fact graph. 

149 live_check = conn.execute( 

150 "SELECT 1 FROM facts WHERE entity = ? AND confidence > 0.0 LIMIT 1", 

151 (canonical,), 

152 ).fetchone() 

153 if live_check: 

154 result.layer1_match = True 

155 return result # exact hit; no further layers needed 

156 

157 # ------------------------------------------------------------------- 

158 # Layer 2 — alias table lookup 

159 # ------------------------------------------------------------------- 

160 alias_row = conn.execute( 

161 "SELECT canonical_uri FROM entity_aliases WHERE raw_uri = ?", 

162 (canonical,), 

163 ).fetchone() 

164 if alias_row is None: 

165 # Also try the raw input (pre-normalisation alias) 

166 alias_row = conn.execute( 

167 "SELECT canonical_uri FROM entity_aliases WHERE raw_uri = ?", 

168 (raw,), 

169 ).fetchone() 

170 

171 if alias_row: 

172 result.layer2_match = str(alias_row["canonical_uri"]) 

173 return result # explicit alias; authoritative 

174 

175 # ------------------------------------------------------------------- 

176 # Layer 3 — token-fuzzy scoring over same-type entities 

177 # ------------------------------------------------------------------- 

178 type_prefix = _type_prefix(canonical) 

179 if type_prefix is None: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was never true

180 # Formal or bare URIs: skip Layer 3 (no type-scoped candidates). 

181 return result 

182 

183 id_seg = _id_segment(canonical) 

184 query_tokens = _tokenise(id_seg) 

185 if not query_tokens: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 return result 

187 

188 # Fetch all distinct entity URIs with the same type prefix from the fact graph. 

189 prefix_pattern = f"{type_prefix}:%" 

190 candidate_rows: list[Any] = conn.execute( 

191 """SELECT DISTINCT entity FROM facts 

192 WHERE entity LIKE ? AND confidence > 0.0 

193 LIMIT 2000""", 

194 (prefix_pattern,), 

195 ).fetchall() 

196 

197 scored: list[ResolveCandidate] = [] 

198 for row in candidate_rows: 

199 candidate_uri: str = row["entity"] 

200 if candidate_uri == canonical: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 continue # already checked in Layer 1 

202 

203 cand_id_seg = _id_segment(candidate_uri) 

204 cand_tokens = _tokenise(cand_id_seg) 

205 score = _token_score(query_tokens, cand_tokens) 

206 

207 if score >= threshold: 

208 scored.append( 

209 ResolveCandidate( 

210 uri=candidate_uri, 

211 score=score, 

212 layer=3, 

213 match_note=f"token_score={score:.2f} ({id_seg!r} ~ {cand_id_seg!r})", 

214 ) 

215 ) 

216 

217 scored.sort(key=lambda c: c.score, reverse=True) 

218 result.layer3_candidates = scored[:top_k] 

219 return result