Coverage for node / src / stigmem_node / routes / graph.py: 87%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Graph traversal route — spec §20 (Phase 9). 

2 

3GET /v1/graph/neighbors 

4 ?entity= required seed entity URI 

5 &depth= 1–3; default 1; 400 graph_depth_exceeded if > 3 

6 &scope= required; facts MUST NOT cross scopes 

7 &relation_filter= optional; prefix-glob (e.g. "memory:*") 

8 &min_confidence= optional; default 0.1 

9 &min_trust= optional; default 0.0 

10 &page_size= default 20; max 200 

11 &cursor= opaque pagination cursor; 400 cursor_expired after 300 s 

12""" 

13 

14from __future__ import annotations 

15 

16import base64 

17import json 

18import time 

19from typing import Annotated, Any 

20 

21from fastapi import APIRouter, Depends, HTTPException, Query 

22 

23from ..auth import Identity, resolve_identity 

24from ..db import db 

25from ..entity_normalizer import NormalizationError, normalize_entity_uri 

26from ..models.graph import NeighborItem, NeighborsResponse 

27from ..recall.graph import MAX_DEPTH, bfs_neighbors 

28 

29router = APIRouter(prefix="/v1/graph", tags=["graph"]) 

30 

31_CURSOR_TTL_S: float = 300.0 

32 

33 

34# --------------------------------------------------------------------------- 

35# Response models 

36# --------------------------------------------------------------------------- 

37 

38 

39# --------------------------------------------------------------------------- 

40# Cursor helpers 

41# --------------------------------------------------------------------------- 

42 

43 

44def _encode_cursor(offset: int) -> str: 

45 payload = json.dumps({"offset": offset, "ts": time.time()}, separators=(",", ":")) 

46 return base64.urlsafe_b64encode(payload.encode()).decode().rstrip("=") 

47 

48 

49def _decode_cursor(cursor: str) -> int: 

50 try: 

51 padded = cursor + "=" * (-len(cursor) % 4) 

52 payload = json.loads(base64.urlsafe_b64decode(padded)) 

53 age = time.time() - float(payload["ts"]) 

54 if age > _CURSOR_TTL_S: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 raise HTTPException( 

56 status_code=400, 

57 detail={"code": "cursor_expired", "message": "pagination cursor has expired"}, 

58 ) 

59 return int(payload["offset"]) 

60 except HTTPException: 

61 raise 

62 except Exception as exc: 

63 raise HTTPException( 

64 status_code=400, 

65 detail={"code": "invalid_cursor", "message": "unreadable pagination cursor"}, 

66 ) from exc 

67 

68 

69# --------------------------------------------------------------------------- 

70# Route 

71# --------------------------------------------------------------------------- 

72 

73 

74@router.get("/neighbors", response_model=NeighborsResponse) 

75def graph_neighbors( 

76 identity: Annotated[Identity, Depends(resolve_identity)], 

77 entity: str = Query(..., description="Seed entity URI"), 

78 depth: int = Query(1, ge=1, description="Traversal depth (1–3)"), 

79 scope: str = Query(..., description="Scope filter; required"), 

80 relation_filter: str | None = Query( 

81 None, description="Prefix-glob relation filter (e.g. 'memory:*')" 

82 ), 

83 min_confidence: float = Query(0.1, ge=0.0, le=1.0), 

84 min_trust: float = Query(0.0, ge=0.0, le=1.0), 

85 page_size: int = Query(20, ge=1, le=200), 

86 cursor: str | None = Query(None), 

87) -> Any: 

88 """Return graph neighbors of entity within depth hops (Spec-X11-Recall-Graph).""" 

89 if not identity.can_read(): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 raise HTTPException(status_code=403, detail="read permission required") 

91 

92 if depth > MAX_DEPTH: 

93 raise HTTPException( 

94 status_code=400, 

95 detail={"code": "graph_depth_exceeded", "message": f"depth must be ≤ {MAX_DEPTH}"}, 

96 ) 

97 

98 try: 

99 seed = normalize_entity_uri(entity) 

100 except NormalizationError as exc: 

101 raise HTTPException(status_code=400, detail=f"invalid_entity_uri: {exc}") from exc 

102 

103 offset = _decode_cursor(cursor) if cursor else 0 

104 

105 with db() as conn: 

106 all_neighbors = bfs_neighbors( 

107 conn, 

108 seed_entity=seed, 

109 max_depth=depth, 

110 scope=scope, 

111 tenant_id=identity.tenant_id, 

112 relation_filter=relation_filter, 

113 min_confidence=min_confidence, 

114 min_trust=min_trust, 

115 identity=identity, 

116 ) 

117 

118 page = all_neighbors[offset : offset + page_size] 

119 next_offset = offset + page_size 

120 next_cursor = _encode_cursor(next_offset) if next_offset < len(all_neighbors) else None 

121 

122 items = [ 

123 NeighborItem( 

124 entity=n.entity, 

125 relation=n.relation, 

126 hops=n.hops, 

127 confidence=n.confidence, 

128 source_trust=n.source_trust, 

129 path=n.path, 

130 ) 

131 for n in page 

132 ] 

133 

134 return NeighborsResponse( 

135 entity=seed, 

136 depth=depth, 

137 neighbors=items, 

138 next_cursor=next_cursor, 

139 total_hint=len(all_neighbors), 

140 )