Coverage for node / src / stigmem_node / routes / synthesize.py: 95%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Scope synthesis route — Phase 6 (spec §synthesize)."""
3from __future__ import annotations
5from datetime import UTC, datetime
6from typing import Annotated, Any
8from fastapi import APIRouter, Depends, HTTPException, Query
10from ..auth import Identity, resolve_identity
11from ..db import db
12from ..models.constants import VALID_SCOPES
14router = APIRouter(prefix="/v1/scopes", tags=["synthesis"])
16_SYS_PREFIX = "stigmem:"
17_URI_PREFIX = "stigmem://"
20def _is_system(entity: str, relation: str) -> bool:
21 return (entity.startswith(_SYS_PREFIX) and not entity.startswith(_URI_PREFIX)) or (
22 relation.startswith(_SYS_PREFIX) and not relation.startswith(_URI_PREFIX)
23 )
26_SYNTHESIZE_SQL = (
27 "SELECT f.*, "
28 " COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, "
29 " COALESCE(fvo.confidence, f.confidence) AS projected_confidence "
30 "FROM facts f "
31 "LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id"
32 " WHERE f.scope = ?"
33 " AND (? = 1"
34 " OR COALESCE(fvo.valid_until, f.valid_until) IS NULL"
35 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)"
36 " ORDER BY COALESCE(fvo.confidence, f.confidence) DESC, f.timestamp DESC"
37 " LIMIT ?"
38)
41def _build_synthesize_params(scope: str, include_expired: bool, limit: int, now: str) -> list[Any]:
42 """Return the bind values for ``_SYNTHESIZE_SQL``.
44 The SQL text is a module-level constant; this helper only computes
45 bind values. Keeping the SQL string out of any function that
46 accepts user input prevents CodeQL from interprocedurally tainting
47 it — see issue #121 for why a function that takes user inputs and
48 returns ``(sql, params)`` still trips ``py/sql-injection`` even
49 when the returned SQL value is invariant.
50 """
51 expired_flag = 1 if include_expired else 0
52 return [scope, expired_flag, now, limit]
55def _count_pair_occurrences(rows: list[Any]) -> dict[tuple[str, str], int]:
56 """Count (entity, relation) occurrences for non-system facts."""
57 seen: dict[tuple[str, str], int] = {}
58 for r in rows:
59 if not _is_system(r["entity"], r["relation"]):
60 key = (r["entity"], r["relation"])
61 seen[key] = seen.get(key, 0) + 1
62 return seen
65def _row_age_seconds(timestamp: str) -> float:
66 """Return seconds elapsed since the row's ISO timestamp; 0.0 on parse error."""
67 try:
68 ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
69 return (datetime.now(UTC) - ts).total_seconds()
70 except (ValueError, TypeError):
71 return 0.0
74def _build_synthesized_fact(
75 r: Any, is_expired: bool, age_seconds: float, contradicted: bool
76) -> dict[str, Any]:
77 """Build the per-fact dict returned by synthesize_scope."""
78 return {
79 "id": r["id"],
80 "entity": r["entity"],
81 "relation": r["relation"],
82 "value": {"type": r["value_type"], "v": r["value_v"]},
83 "confidence": r["projected_confidence"],
84 "timestamp": r["timestamp"],
85 "valid_until": r["projected_valid_until"],
86 "is_expired": is_expired,
87 "age_seconds": age_seconds,
88 "contradicted": contradicted,
89 "source": r["source"],
90 }
93@router.get("/{scope}/synthesize")
94def synthesize_scope(
95 scope: str,
96 identity: Annotated[Identity, Depends(resolve_identity)],
97 include_expired: bool = Query(False),
98 limit: int = Query(200, ge=1, le=1000),
99) -> dict[str, Any]:
100 """Confidence-weighted summary of all facts in a scope (Phase 6).
102 Returns facts sorted by confidence descending, with contradiction flags and
103 freshness metadata for each fact, plus aggregate statistics.
104 """
105 if not identity.can_read(): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise HTTPException(status_code=403, detail="read permission required")
107 if scope not in VALID_SCOPES:
108 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}")
110 now = datetime.now(UTC).isoformat()
112 params = _build_synthesize_params(scope, include_expired, limit, now)
114 with db() as conn:
115 rows = conn.execute(_SYNTHESIZE_SQL, params).fetchall()
117 # Count occurrences per (entity, relation) among non-system facts to detect contradictions
118 seen = _count_pair_occurrences(rows)
120 facts_out: list[dict[str, Any]] = []
121 contradiction_count = 0
122 expired_count = 0
124 for r in rows:
125 is_expired = (
126 r["projected_valid_until"] is not None and r["projected_valid_until"] <= now
127 )
128 if is_expired:
129 expired_count += 1
131 contradicted = False
132 if not _is_system(r["entity"], r["relation"]):
133 contradicted = seen.get((r["entity"], r["relation"]), 0) > 1
134 if contradicted:
135 contradiction_count += 1
137 age_seconds = _row_age_seconds(r["timestamp"])
139 facts_out.append(_build_synthesized_fact(r, is_expired, age_seconds, contradicted))
141 confidences = [f["confidence"] for f in facts_out]
142 mean_confidence = sum(confidences) / len(confidences) if confidences else 0.0
143 timestamps = [f["timestamp"] for f in facts_out]
145 return {
146 "scope": scope,
147 "fact_count": len(facts_out),
148 "facts": facts_out,
149 "contradiction_count": contradiction_count,
150 "mean_confidence": mean_confidence,
151 "freshest_timestamp": max(timestamps) if timestamps else None,
152 "oldest_timestamp": min(timestamps) if timestamps else None,
153 "expired_fact_count": expired_count,
154 "synthesized_at": now,
155 }