Coverage for node / src / stigmem_node / recall / graph.py: 96%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""BFS graph traversal — neighbors() implementation, spec §20.2. 

2 

3Exported surface: 

4 bfs_neighbors(conn, seed_entity, max_depth, scope, tenant_id, ...) -> list[NeighborEntry] 

5 

6Invariants honored: 

7 - Only traverses edges with confidence >= min_confidence and valid_until not expired. 

8 - Filters garden-tagged edges against caller ACL (§17.3). 

9 - Blocks federated edges (received_from is not null) if caller lacks federate perm (§19.5.2). 

10 - Relation filter supports prefix-glob only (e.g. "memory:*"), not full regex. 

11 - De-duplicates: each neighbor entity appears once at its shortest hop distance. 

12""" 

13 

14from __future__ import annotations 

15 

16from dataclasses import dataclass, field 

17from datetime import UTC, datetime 

18from typing import TYPE_CHECKING, Any 

19 

20if TYPE_CHECKING: 

21 from ..auth import Identity 

22 

23MAX_DEPTH = 3 

24 

25 

26@dataclass 

27class NeighborEntry: 

28 entity: str 

29 relation: str 

30 hops: int 

31 confidence: float 

32 source_trust: float | None 

33 path: list[str] = field(default_factory=list) 

34 

35 

36def bfs_neighbors( 

37 conn: Any, 

38 seed_entity: str, 

39 max_depth: int, 

40 scope: str, 

41 tenant_id: str, 

42 relation_filter: str | None = None, 

43 min_confidence: float = 0.1, 

44 min_trust: float = 0.0, 

45 identity: Identity | None = None, 

46) -> list[NeighborEntry]: 

47 """BFS over entity_edges, returning all neighbors within max_depth hops. 

48 

49 path in each NeighborEntry is the list of traversed entity URIs from the 

50 seed up to (and including) the seed, excluding the neighbor itself. 

51 Shortest path is reported when an entity is reachable via multiple routes. 

52 """ 

53 now = datetime.now(UTC).isoformat() 

54 

55 results: list[NeighborEntry] = [] 

56 # visited[entity] = path_from_seed_to_entity_not_including_entity 

57 visited: dict[str, list[str]] = {seed_entity: []} 

58 frontier: list[str] = [seed_entity] 

59 

60 for hop in range(1, max_depth + 1): 

61 if not frontier: 

62 break 

63 

64 next_frontier: list[str] = [] 

65 

66 for current_entity in frontier: 

67 path_to_current = visited[current_entity] 

68 # Path stored in neighbor = nodes from seed up to and including current 

69 neighbor_path = [*path_to_current, current_entity] 

70 

71 edges = _fetch_incident_edges( 

72 conn, 

73 current_entity, 

74 scope, 

75 tenant_id, 

76 now, 

77 min_confidence, 

78 min_trust, 

79 relation_filter, 

80 identity, 

81 ) 

82 

83 for edge in edges: 

84 neighbor = edge["object"] if edge["subject"] == current_entity else edge["subject"] 

85 if neighbor in visited: 

86 continue 

87 

88 entry = NeighborEntry( 

89 entity=neighbor, 

90 relation=edge["relation"], 

91 hops=hop, 

92 confidence=edge["confidence"], 

93 source_trust=edge["source_trust"], 

94 path=neighbor_path, 

95 ) 

96 results.append(entry) 

97 visited[neighbor] = neighbor_path 

98 next_frontier.append(neighbor) 

99 

100 frontier = next_frontier 

101 

102 return results 

103 

104 

105def _match_relation(relation: str, pattern: str) -> bool: 

106 """Prefix-glob match — 'memory:*' matches any relation starting with 'memory:'.""" 

107 if pattern.endswith("*"): 

108 return relation.startswith(pattern[:-1]) 

109 return relation == pattern 

110 

111 

112def _fetch_incident_edges( 

113 conn: Any, 

114 entity: str, 

115 scope: str, 

116 tenant_id: str, 

117 now: str, 

118 min_confidence: float, 

119 min_trust: float, 

120 relation_filter: str | None, 

121 identity: Identity | None, 

122) -> list[Any]: 

123 """Fetch and filter edges incident on entity from both traversal directions. 

124 

125 Pre-filters by confidence + trust in SQL for efficiency. Garden ACL, 

126 federation scope, and relation filter are applied in Python after fetch. 

127 """ 

128 params: list[Any] = [entity, entity, scope, tenant_id, min_confidence, now] 

129 sql = """ 

130 SELECT id, subject, relation, object, scope, garden_id, 

131 received_from, confidence, source_trust 

132 FROM entity_edges 

133 WHERE (subject = ? OR object = ?) 

134 AND scope = ? 

135 AND tenant_id = ? 

136 AND confidence >= ? 

137 AND (valid_until IS NULL OR valid_until > ?) 

138 """ 

139 

140 if min_trust > 0.0: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was never true

141 # Edges with NULL source_trust pass through (trust not yet computed) 

142 sql += " AND (source_trust IS NULL OR source_trust >= ?)" 

143 params.append(min_trust) 

144 

145 rows = conn.execute(sql, params).fetchall() 

146 

147 filtered: list[Any] = [] 

148 for row in rows: 

149 # Garden ACL (§17.3): hide edges in gardens the caller cannot see 

150 garden_id = row["garden_id"] 

151 if garden_id is not None and identity is not None: 

152 from ..memory_garden_acl_gate import recall_filter_enabled 

153 

154 if recall_filter_enabled(): 

155 from ..garden_acl import caller_can_see_garden 

156 

157 if not caller_can_see_garden(garden_id, identity): 157 ↛ 161line 157 didn't jump to line 161 because the condition on line 157 was always true

158 continue 

159 

160 # Federation filter (§19.5.2): edges from remote nodes require federate perm 

161 received_from = row["received_from"] 

162 if received_from is not None and (identity is None or not identity.can_federate()): 

163 continue 

164 

165 # Relation prefix-glob filter 

166 if relation_filter is not None and not _match_relation(row["relation"], relation_filter): 

167 continue 

168 

169 filtered.append(row) 

170 

171 return filtered