Coverage for node / src / stigmem_node / routes / recall / graph.py: 88%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Graph expansion recall stage.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from ...auth import Identity 

8from ...recall.graph import bfs_neighbors 

9 

10_MAX_SEED_ENTITIES = 5 

11_MAX_GRAPH_ENTITIES = 50 

12 

13 

14def _graph_expand( 

15 conn: Any, 

16 seed_fact_ids: list[str], 

17 depth: int, 

18 scope: str, 

19 tenant_id: str, 

20 identity: Identity, 

21 k: int, 

22 min_confidence: float, 

23 now: str, 

24) -> dict[str, int]: 

25 """BFS-expand from top-seed entities; return {fact_id: min_hops}.""" 

26 if not seed_fact_ids: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 return {} 

28 

29 placeholders = ",".join("?" * len(seed_fact_ids)) 

30 seed_rows = conn.execute( 

31 f"SELECT DISTINCT entity FROM facts WHERE id IN ({placeholders})", # noqa: S608 # nosec B608 

32 seed_fact_ids, 

33 ).fetchall() 

34 

35 seed_entities = [row["entity"] for row in seed_rows][:_MAX_SEED_ENTITIES] 

36 if not seed_entities: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 return {} 

38 

39 entity_min_hops: dict[str, int] = {} 

40 for seed in seed_entities: 

41 neighbors = bfs_neighbors( 

42 conn, 

43 seed_entity=seed, 

44 max_depth=depth, 

45 scope=scope, 

46 tenant_id=tenant_id, 

47 identity=identity, 

48 ) 

49 for n in neighbors: 

50 prev = entity_min_hops.get(n.entity) 

51 if prev is None or n.hops < prev: 51 ↛ 49line 51 didn't jump to line 49 because the condition on line 51 was always true

52 entity_min_hops[n.entity] = n.hops 

53 

54 if not entity_min_hops: 

55 return {} 

56 

57 entities = list(entity_min_hops.keys())[:_MAX_GRAPH_ENTITIES] 

58 placeholders = ",".join("?" * len(entities)) 

59 sql = f""" 

60 SELECT f.id, f.entity 

61 FROM facts f 

62 LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id 

63 WHERE f.entity IN ({placeholders}) -- nosec B608 

64 AND f.scope = ? 

65 AND f.tenant_id = ? 

66 AND COALESCE(fvo.confidence, f.confidence) >= ? 

67 AND ( 

68 COALESCE(fvo.valid_until, f.valid_until) IS NULL 

69 OR COALESCE(fvo.valid_until, f.valid_until) > ? 

70 ) 

71 LIMIT ? 

72 """ # noqa: S608 # nosec B608 

73 fact_rows = conn.execute( 

74 sql, 

75 [*entities, scope, tenant_id, min_confidence, now, k], 

76 ).fetchall() 

77 

78 return {row["id"]: entity_min_hops.get(row["entity"], depth) for row in fact_rows}