Coverage for node / src / stigmem_node / routes / recall / ranking.py: 84%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Recall scoring and packing stages.""" 

2 

3from __future__ import annotations 

4 

5from ...auth import Identity 

6from ...garden_acl import caller_can_see_garden 

7from ...memory_garden_acl_gate import recall_filter_enabled 

8from ...models.facts import FactRecord 

9from ...models.recall import RecallWeights, ScoreBreakdown, ScoredFact 

10from ...plugins import get_registry 

11from .common import _estimate_tokens, _recency_score 

12 

13 

14def _score_candidates( 

15 all_facts: dict[str, FactRecord], 

16 lex_scores: dict[str, float], 

17 sem_scores: dict[str, float], 

18 graph_hops: dict[str, int], 

19 weights: RecallWeights, 

20 identity: Identity, 

21 depth: int, 

22) -> list[ScoredFact]: 

23 """Compute composite score for each candidate fact.""" 

24 w = weights 

25 total_weight = w.lexical + w.semantic + w.graph + w.recency 

26 if total_weight <= 0: 

27 total_weight = 1.0 

28 

29 results: list[ScoredFact] = [] 

30 

31 for fact_id, record in all_facts.items(): 

32 # Skip quarantined / fully-redacted 

33 if record.quarantine_status == "pending": 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 continue 

35 

36 # Salience signal: contradiction-resolution status 

37 contradiction_factor = 0.1 if record.contradicted else 1.0 

38 

39 # Salience signal: garden tier (quarantine garden = 0, normal = 1) 

40 garden_factor = 1.0 

41 if record.garden_id is not None and recall_filter_enabled(): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 if not caller_can_see_garden(record.garden_id, identity): 

43 continue # hidden by ACL 

44 # Penalise quarantine-tagged gardens (§17) 

45 garden_factor = 0.5 

46 

47 # Source-trust rank contribution is plugin-owned. Default installs keep 

48 # this salience signal at 0 and can receive deltas from recall_rank. 

49 st_score = 0.0 

50 

51 # Lexical signal 

52 lex = lex_scores.get(fact_id, 0.0) 

53 

54 # Semantic signal 

55 sem = sem_scores.get(fact_id, 0.0) 

56 

57 # Graph signal: inverse of hop distance (direct=0 → no graph bonus, 1 hop → 0.5, etc.) 

58 hops = graph_hops.get(fact_id) 

59 if hops is not None and hops > 0: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 graph_s = 1.0 / (1.0 + hops) 

61 else: 

62 graph_s = 0.0 if hops is None else 1.0 

63 

64 # Salience signal: recency 

65 recency_s = _recency_score(record.timestamp) 

66 

67 # Salience signal: decay proxy (confidence itself encodes decay) 

68 decay_factor = max(0.0, record.confidence) 

69 

70 # Weighted sum (normalised by sum of non-zero weights) 

71 raw_total = ( 

72 w.lexical * lex 

73 + w.semantic * sem 

74 + w.graph * graph_s 

75 + w.recency * recency_s 

76 ) / total_weight 

77 

78 # Apply multiplicative adjustments 

79 final_score = raw_total * decay_factor * contradiction_factor * garden_factor 

80 

81 breakdown = ScoreBreakdown( 

82 lexical=round(lex, 4), 

83 semantic=round(sem, 4), 

84 graph=round(graph_s, 4), 

85 source_trust=round(st_score, 4), 

86 recency=round(recency_s, 4), 

87 weighted_total=round(final_score, 6), 

88 ) 

89 

90 results.append( 

91 ScoredFact( 

92 fact=record, 

93 score=round(final_score, 6), 

94 score_breakdown=breakdown, 

95 hop_distance=hops if hops is not None else 0, 

96 token_estimate=_estimate_tokens(record), 

97 ) 

98 ) 

99 

100 source_deltas = get_registry().fire_score_delta( 

101 "recall_rank", 

102 results, 

103 identity=identity, 

104 weights=weights, 

105 depth=depth, 

106 ) 

107 if source_deltas: 

108 adjusted: list[ScoredFact] = [] 

109 for scored in results: 

110 delta = source_deltas.get(scored.fact.id, 0.0) 

111 if delta == 0.0: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 adjusted.append(scored) 

113 continue 

114 next_score = round(max(0.0, scored.score + delta), 6) 

115 adjusted.append( 

116 scored.model_copy( 

117 update={ 

118 "score": next_score, 

119 "score_breakdown": scored.score_breakdown.model_copy( 

120 update={"weighted_total": next_score} 

121 ), 

122 } 

123 ) 

124 ) 

125 results = adjusted 

126 

127 return results 

128 

129 

130# --------------------------------------------------------------------------- 

131# Token-budget greedy packing 

132# --------------------------------------------------------------------------- 

133 

134 

135def _greedy_pack( 

136 candidates: list[ScoredFact], 

137 token_budget: int, 

138) -> tuple[list[ScoredFact], int, bool]: 

139 """Sort by score desc; include facts until budget exhausted. 

140 

141 Returns (packed_facts, tokens_used, truncated). 

142 """ 

143 candidates.sort(key=lambda c: c.score, reverse=True) 

144 packed: list[ScoredFact] = [] 

145 tokens_used = 0 

146 truncated = False 

147 

148 for candidate in candidates: 

149 if tokens_used + candidate.token_estimate > token_budget: 

150 truncated = True 

151 continue # skip this fact (too large), keep trying smaller ones 

152 packed.append(candidate) 

153 tokens_used += candidate.token_estimate 

154 

155 # Secondary sort within packed: preserve score order 

156 packed.sort(key=lambda c: c.score, reverse=True) 

157 return packed, tokens_used, truncated