Coverage for node / src / stigmem_node / routes / synthesize.py: 95%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Scope synthesis route — Phase 6 (spec §synthesize).""" 

2 

3from __future__ import annotations 

4 

5from datetime import UTC, datetime 

6from typing import Annotated, Any 

7 

8from fastapi import APIRouter, Depends, HTTPException, Query 

9 

10from ..auth import Identity, resolve_identity 

11from ..db import db 

12from ..models.constants import VALID_SCOPES 

13 

14router = APIRouter(prefix="/v1/scopes", tags=["synthesis"]) 

15 

16_SYS_PREFIX = "stigmem:" 

17_URI_PREFIX = "stigmem://" 

18 

19 

20def _is_system(entity: str, relation: str) -> bool: 

21 return (entity.startswith(_SYS_PREFIX) and not entity.startswith(_URI_PREFIX)) or ( 

22 relation.startswith(_SYS_PREFIX) and not relation.startswith(_URI_PREFIX) 

23 ) 

24 

25 

26_SYNTHESIZE_SQL = ( 

27 "SELECT f.*, " 

28 " COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, " 

29 " COALESCE(fvo.confidence, f.confidence) AS projected_confidence " 

30 "FROM facts f " 

31 "LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id" 

32 " WHERE f.scope = ?" 

33 " AND (? = 1" 

34 " OR COALESCE(fvo.valid_until, f.valid_until) IS NULL" 

35 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)" 

36 " ORDER BY COALESCE(fvo.confidence, f.confidence) DESC, f.timestamp DESC" 

37 " LIMIT ?" 

38) 

39 

40 

41def _build_synthesize_params(scope: str, include_expired: bool, limit: int, now: str) -> list[Any]: 

42 """Return the bind values for ``_SYNTHESIZE_SQL``. 

43 

44 The SQL text is a module-level constant; this helper only computes 

45 bind values. Keeping the SQL string out of any function that 

46 accepts user input prevents CodeQL from interprocedurally tainting 

47 it — see issue #121 for why a function that takes user inputs and 

48 returns ``(sql, params)`` still trips ``py/sql-injection`` even 

49 when the returned SQL value is invariant. 

50 """ 

51 expired_flag = 1 if include_expired else 0 

52 return [scope, expired_flag, now, limit] 

53 

54 

55def _count_pair_occurrences(rows: list[Any]) -> dict[tuple[str, str], int]: 

56 """Count (entity, relation) occurrences for non-system facts.""" 

57 seen: dict[tuple[str, str], int] = {} 

58 for r in rows: 

59 if not _is_system(r["entity"], r["relation"]): 

60 key = (r["entity"], r["relation"]) 

61 seen[key] = seen.get(key, 0) + 1 

62 return seen 

63 

64 

65def _row_age_seconds(timestamp: str) -> float: 

66 """Return seconds elapsed since the row's ISO timestamp; 0.0 on parse error.""" 

67 try: 

68 ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) 

69 return (datetime.now(UTC) - ts).total_seconds() 

70 except (ValueError, TypeError): 

71 return 0.0 

72 

73 

74def _build_synthesized_fact( 

75 r: Any, is_expired: bool, age_seconds: float, contradicted: bool 

76) -> dict[str, Any]: 

77 """Build the per-fact dict returned by synthesize_scope.""" 

78 return { 

79 "id": r["id"], 

80 "entity": r["entity"], 

81 "relation": r["relation"], 

82 "value": {"type": r["value_type"], "v": r["value_v"]}, 

83 "confidence": r["projected_confidence"], 

84 "timestamp": r["timestamp"], 

85 "valid_until": r["projected_valid_until"], 

86 "is_expired": is_expired, 

87 "age_seconds": age_seconds, 

88 "contradicted": contradicted, 

89 "source": r["source"], 

90 } 

91 

92 

93@router.get("/{scope}/synthesize") 

94def synthesize_scope( 

95 scope: str, 

96 identity: Annotated[Identity, Depends(resolve_identity)], 

97 include_expired: bool = Query(False), 

98 limit: int = Query(200, ge=1, le=1000), 

99) -> dict[str, Any]: 

100 """Confidence-weighted summary of all facts in a scope (Phase 6). 

101 

102 Returns facts sorted by confidence descending, with contradiction flags and 

103 freshness metadata for each fact, plus aggregate statistics. 

104 """ 

105 if not identity.can_read(): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise HTTPException(status_code=403, detail="read permission required") 

107 if scope not in VALID_SCOPES: 

108 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}") 

109 

110 now = datetime.now(UTC).isoformat() 

111 

112 params = _build_synthesize_params(scope, include_expired, limit, now) 

113 

114 with db() as conn: 

115 rows = conn.execute(_SYNTHESIZE_SQL, params).fetchall() 

116 

117 # Count occurrences per (entity, relation) among non-system facts to detect contradictions 

118 seen = _count_pair_occurrences(rows) 

119 

120 facts_out: list[dict[str, Any]] = [] 

121 contradiction_count = 0 

122 expired_count = 0 

123 

124 for r in rows: 

125 is_expired = ( 

126 r["projected_valid_until"] is not None and r["projected_valid_until"] <= now 

127 ) 

128 if is_expired: 

129 expired_count += 1 

130 

131 contradicted = False 

132 if not _is_system(r["entity"], r["relation"]): 

133 contradicted = seen.get((r["entity"], r["relation"]), 0) > 1 

134 if contradicted: 

135 contradiction_count += 1 

136 

137 age_seconds = _row_age_seconds(r["timestamp"]) 

138 

139 facts_out.append(_build_synthesized_fact(r, is_expired, age_seconds, contradicted)) 

140 

141 confidences = [f["confidence"] for f in facts_out] 

142 mean_confidence = sum(confidences) / len(confidences) if confidences else 0.0 

143 timestamps = [f["timestamp"] for f in facts_out] 

144 

145 return { 

146 "scope": scope, 

147 "fact_count": len(facts_out), 

148 "facts": facts_out, 

149 "contradiction_count": contradiction_count, 

150 "mean_confidence": mean_confidence, 

151 "freshest_timestamp": max(timestamps) if timestamps else None, 

152 "oldest_timestamp": min(timestamps) if timestamps else None, 

153 "expired_fact_count": expired_count, 

154 "synthesized_at": now, 

155 }