Coverage for node / src / stigmem_node / recall / graph.py: 96%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""BFS graph traversal — neighbors() implementation, spec §20.2.
3Exported surface:
4 bfs_neighbors(conn, seed_entity, max_depth, scope, tenant_id, ...) -> list[NeighborEntry]
6Invariants honored:
7 - Only traverses edges with confidence >= min_confidence and valid_until not expired.
8 - Filters garden-tagged edges against caller ACL (§17.3).
9 - Blocks federated edges (received_from is not null) if caller lacks federate perm (§19.5.2).
10 - Relation filter supports prefix-glob only (e.g. "memory:*"), not full regex.
11 - De-duplicates: each neighbor entity appears once at its shortest hop distance.
12"""
14from __future__ import annotations
16from dataclasses import dataclass, field
17from datetime import UTC, datetime
18from typing import TYPE_CHECKING, Any
20if TYPE_CHECKING:
21 from ..auth import Identity
23MAX_DEPTH = 3
26@dataclass
27class NeighborEntry:
28 entity: str
29 relation: str
30 hops: int
31 confidence: float
32 source_trust: float | None
33 path: list[str] = field(default_factory=list)
36def bfs_neighbors(
37 conn: Any,
38 seed_entity: str,
39 max_depth: int,
40 scope: str,
41 tenant_id: str,
42 relation_filter: str | None = None,
43 min_confidence: float = 0.1,
44 min_trust: float = 0.0,
45 identity: Identity | None = None,
46) -> list[NeighborEntry]:
47 """BFS over entity_edges, returning all neighbors within max_depth hops.
49 path in each NeighborEntry is the list of traversed entity URIs from the
50 seed up to (and including) the seed, excluding the neighbor itself.
51 Shortest path is reported when an entity is reachable via multiple routes.
52 """
53 now = datetime.now(UTC).isoformat()
55 results: list[NeighborEntry] = []
56 # visited[entity] = path_from_seed_to_entity_not_including_entity
57 visited: dict[str, list[str]] = {seed_entity: []}
58 frontier: list[str] = [seed_entity]
60 for hop in range(1, max_depth + 1):
61 if not frontier:
62 break
64 next_frontier: list[str] = []
66 for current_entity in frontier:
67 path_to_current = visited[current_entity]
68 # Path stored in neighbor = nodes from seed up to and including current
69 neighbor_path = [*path_to_current, current_entity]
71 edges = _fetch_incident_edges(
72 conn,
73 current_entity,
74 scope,
75 tenant_id,
76 now,
77 min_confidence,
78 min_trust,
79 relation_filter,
80 identity,
81 )
83 for edge in edges:
84 neighbor = edge["object"] if edge["subject"] == current_entity else edge["subject"]
85 if neighbor in visited:
86 continue
88 entry = NeighborEntry(
89 entity=neighbor,
90 relation=edge["relation"],
91 hops=hop,
92 confidence=edge["confidence"],
93 source_trust=edge["source_trust"],
94 path=neighbor_path,
95 )
96 results.append(entry)
97 visited[neighbor] = neighbor_path
98 next_frontier.append(neighbor)
100 frontier = next_frontier
102 return results
105def _match_relation(relation: str, pattern: str) -> bool:
106 """Prefix-glob match — 'memory:*' matches any relation starting with 'memory:'."""
107 if pattern.endswith("*"):
108 return relation.startswith(pattern[:-1])
109 return relation == pattern
112def _fetch_incident_edges(
113 conn: Any,
114 entity: str,
115 scope: str,
116 tenant_id: str,
117 now: str,
118 min_confidence: float,
119 min_trust: float,
120 relation_filter: str | None,
121 identity: Identity | None,
122) -> list[Any]:
123 """Fetch and filter edges incident on entity from both traversal directions.
125 Pre-filters by confidence + trust in SQL for efficiency. Garden ACL,
126 federation scope, and relation filter are applied in Python after fetch.
127 """
128 params: list[Any] = [entity, entity, scope, tenant_id, min_confidence, now]
129 sql = """
130 SELECT id, subject, relation, object, scope, garden_id,
131 received_from, confidence, source_trust
132 FROM entity_edges
133 WHERE (subject = ? OR object = ?)
134 AND scope = ?
135 AND tenant_id = ?
136 AND confidence >= ?
137 AND (valid_until IS NULL OR valid_until > ?)
138 """
140 if min_trust > 0.0: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was never true
141 # Edges with NULL source_trust pass through (trust not yet computed)
142 sql += " AND (source_trust IS NULL OR source_trust >= ?)"
143 params.append(min_trust)
145 rows = conn.execute(sql, params).fetchall()
147 filtered: list[Any] = []
148 for row in rows:
149 # Garden ACL (§17.3): hide edges in gardens the caller cannot see
150 garden_id = row["garden_id"]
151 if garden_id is not None and identity is not None:
152 from ..memory_garden_acl_gate import recall_filter_enabled
154 if recall_filter_enabled():
155 from ..garden_acl import caller_can_see_garden
157 if not caller_can_see_garden(garden_id, identity): 157 ↛ 161line 157 didn't jump to line 161 because the condition on line 157 was always true
158 continue
160 # Federation filter (§19.5.2): edges from remote nodes require federate perm
161 received_from = row["received_from"]
162 if received_from is not None and (identity is None or not identity.can_federate()):
163 continue
165 # Relation prefix-glob filter
166 if relation_filter is not None and not _match_relation(row["relation"], relation_filter):
167 continue
169 filtered.append(row)
171 return filtered