Coverage for node / src / stigmem_node / routes / lint.py: 86%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Lint route — spec §14 (v0.7) + async job path (spec §14.5).
3POST /v1/lint
4{ scope, checks?, entity?, relation?, stale_lookahead_s? }
5 → 200 sync result, or 202 { job_id, status, estimated_s } when scope > threshold.
7GET /v1/lint/jobs/:job_id
8 → 200 job status/result, or 404 if not found.
9"""
11from __future__ import annotations
13from datetime import UTC, datetime, timedelta
14from typing import Annotated, Any
16from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
17from fastapi.responses import JSONResponse
19from ..auth import Identity, resolve_identity
20from ..db import db
21from ..jobs import create_job, get_job, mark_done, mark_failed, mark_running
22from ..models.constants import VALID_SCOPES
23from ..models.lint import ALL_CHECKS, LintCheck, LintFinding, LintRequest, LintResult
24from ..settings import settings
26router = APIRouter(tags=["lint"])
28INTENT_ROUTING_RELATIONS = frozenset({"intent:handoff_to", "intent:context_ref"})
30# Constant WHERE-fragment tails for the lint queries. Optional filters are
31# gated via ``(? IS NULL OR …)`` so the SQL strings are module-level constants
32# — no user input ever flows into the query text. Closes the
33# ``py/sql-injection`` taint that the previous conditional-fragment builder
34# triggered (issue #115).
35_COUNT_SQL = (
36 "SELECT COUNT(*) FROM facts f"
37 " WHERE 1=1"
38 " AND f.scope = ?"
39 " AND (? IS NULL OR f.entity = ?)"
40 " AND (? IS NULL OR f.relation = ?)"
41)
44def _lint_filter_params(scope: str, entity: str | None, relation: str | None) -> list[Any]:
45 """Return the bind values for ``_F_FILTER_TAIL`` / ``_FA_FILTER_TAIL``.
47 Empty-string entity/relation are normalized to None so the IS-NULL gate
48 preserves the previous ``if entity:`` truthiness behaviour.
49 """
50 entity_p = entity or None
51 relation_p = relation or None
52 return [scope, entity_p, entity_p, relation_p, relation_p]
55_CONFLICT_SQL = (
56 "SELECT c.id AS conflict_id, c.fact_a_id, c.fact_b_id, fa.entity, fa.relation"
57 " FROM conflicts c"
58 " JOIN facts fa ON fa.id = c.fact_a_id"
59 " JOIN facts fb ON fb.id = c.fact_b_id"
60 " WHERE c.status = 'unresolved'"
61 " AND fa.scope = ?"
62 " AND (? IS NULL OR fa.entity = ?)"
63 " AND (? IS NULL OR fa.relation = ?)"
64)
67def _check_contradictions(conn: Any, fa_params: list[Any]) -> list[dict[str, Any]]:
68 """Return contradiction findings for unresolved conflicts in the filtered scope."""
69 findings: list[dict[str, Any]] = []
70 for row in conn.execute(_CONFLICT_SQL, fa_params).fetchall():
71 findings.append(
72 {
73 "check": "contradiction",
74 "severity": "error",
75 "entity": row["entity"],
76 "relation": row["relation"],
77 "fact_ids": [row["fact_a_id"], row["fact_b_id"]],
78 "detail": f"unresolved conflict {row['conflict_id']}",
79 }
80 )
81 return findings
84_STALE_SQL = (
85 "SELECT f.id, f.entity, f.relation, f.valid_until"
86 " FROM facts f"
87 " WHERE f.valid_until IS NOT NULL"
88 " AND f.confidence > 0.0"
89 " AND f.valid_until <= ?"
90 " AND f.scope = ?"
91 " AND (? IS NULL OR f.entity = ?)"
92 " AND (? IS NULL OR f.relation = ?)"
93)
96def _check_stale(
97 conn: Any,
98 f_params: list[Any],
99 now: str,
100 lookahead: str,
101 stale_lookahead_s: int,
102) -> list[dict[str, Any]]:
103 """Return stale (already-expired or expiring-soon) findings."""
104 findings: list[dict[str, Any]] = []
105 for row in conn.execute(_STALE_SQL, [lookahead] + f_params).fetchall(): 105 ↛ 106line 105 didn't jump to line 106 because the loop on line 105 never started
106 expired = row["valid_until"] <= now
107 findings.append(
108 {
109 "check": "stale",
110 "severity": "warning" if expired else "info",
111 "entity": row["entity"],
112 "relation": row["relation"],
113 "fact_ids": [row["id"]],
114 "detail": (
115 f"expired at {row['valid_until']}"
116 if expired
117 else f"expires at {row['valid_until']} (within {stale_lookahead_s}s)"
118 ),
119 }
120 )
121 return findings
124_ORPHAN_SQL = (
125 "SELECT entity FROM facts"
126 " WHERE scope = ?"
127 " AND (? IS NULL OR entity = ?)"
128 " GROUP BY entity"
129 " HAVING COUNT(*) > 0"
130 " AND SUM(CASE WHEN confidence > 0.0"
131 " AND (valid_until IS NULL OR valid_until > ?) THEN 1 ELSE 0 END) = 0"
132)
135def _check_orphans(conn: Any, scope: str, entity: str | None, now: str) -> list[dict[str, Any]]:
136 """Return orphan-entity findings (entities with no live facts in scope)."""
137 findings: list[dict[str, Any]] = []
138 entity_p = entity or None
139 for row in conn.execute(_ORPHAN_SQL, [scope, entity_p, entity_p, now]).fetchall(): 139 ↛ 140line 139 didn't jump to line 140 because the loop on line 139 never started
140 findings.append(
141 {
142 "check": "orphan",
143 "severity": "info",
144 "entity": row["entity"],
145 "relation": None,
146 "fact_ids": [],
147 "detail": f"entity {row['entity']!r} has no live facts in scope={scope}",
148 }
149 )
150 return findings
153_REF_SQL = (
154 "SELECT f.id, f.entity, f.relation, f.value_v"
155 " FROM facts f"
156 " WHERE f.value_type = 'ref'"
157 " AND f.confidence > 0.0"
158 " AND (f.valid_until IS NULL OR f.valid_until > ?)"
159 " AND f.scope = ?"
160 " AND (? IS NULL OR f.entity = ?)"
161 " AND (? IS NULL OR f.relation = ?)"
162)
165def _check_broken_refs(conn: Any, f_params: list[Any], now: str) -> list[dict[str, Any]]:
166 """Return broken-ref findings for value-type=ref facts whose target has no live facts."""
167 findings: list[dict[str, Any]] = []
168 for row in conn.execute(_REF_SQL, [now] + f_params).fetchall(): 168 ↛ 169line 168 didn't jump to line 169 because the loop on line 168 never started
169 target_entity = row["value_v"]
170 live_count = conn.execute(
171 "SELECT COUNT(*) FROM facts"
172 " WHERE entity = ? AND confidence > 0.0"
173 " AND (valid_until IS NULL OR valid_until > ?)",
174 [target_entity, now],
175 ).fetchone()[0]
176 if live_count == 0:
177 is_intent = row["relation"] in INTENT_ROUTING_RELATIONS
178 findings.append(
179 {
180 "check": "broken_ref",
181 "severity": "error" if is_intent else "warning",
182 "entity": row["entity"],
183 "relation": row["relation"],
184 "fact_ids": [row["id"]],
185 "detail": f"ref target entity {target_entity!r} has no live facts",
186 }
187 )
188 return findings
191_NS_SQL = (
192 "SELECT f.entity, f.relation, GROUP_CONCAT(f.id) AS ids"
193 " FROM facts f"
194 " WHERE f.confidence > 0.0"
195 " AND (f.valid_until IS NULL OR f.valid_until > ?)"
196 " AND instr(f.relation, ':') = 0"
197 " AND f.scope = ?"
198 " AND (? IS NULL OR f.entity = ?)"
199 " AND (? IS NULL OR f.relation = ?)"
200 " GROUP BY f.entity, f.relation"
201)
204def _check_namespacing(conn: Any, f_params: list[Any], now: str) -> list[dict[str, Any]]:
205 """Return namespacing findings for live facts whose relation lacks a 'prefix:' namespace."""
206 findings: list[dict[str, Any]] = []
207 for row in conn.execute(_NS_SQL, [now] + f_params).fetchall():
208 findings.append(
209 {
210 "check": "namespacing",
211 "severity": "warning",
212 "entity": row["entity"],
213 "relation": row["relation"],
214 "fact_ids": row["ids"].split(",") if row["ids"] else [],
215 "detail": (
216 f"bare relation {row['relation']!r} has no namespace prefix — "
217 f"rename to 'your-prefix:{row['relation']}' to avoid silent collisions"
218 ),
219 }
220 )
221 return findings
224def _run_lint_sweep(
225 scope: str,
226 checks: list[LintCheck],
227 entity: str | None,
228 relation: str | None,
229 stale_lookahead_s: int,
230) -> dict[str, Any]:
231 """Execute the lint sweep and return a dict matching LintResult fields."""
232 now_dt = datetime.now(UTC)
233 now = now_dt.isoformat()
234 lookahead = (now_dt + timedelta(seconds=stale_lookahead_s)).isoformat()
236 f_params = _lint_filter_params(scope, entity, relation)
237 fa_params = _lint_filter_params(scope, entity, relation)
239 findings: list[dict[str, Any]] = []
240 fact_count = 0
242 with db() as conn:
243 fact_count = conn.execute(_COUNT_SQL, f_params).fetchone()[0]
245 if "contradiction" in checks:
246 findings.extend(_check_contradictions(conn, fa_params))
248 if "stale" in checks:
249 findings.extend(_check_stale(conn, f_params, now, lookahead, stale_lookahead_s))
251 if "orphan" in checks:
252 findings.extend(_check_orphans(conn, scope, entity, now))
254 if "broken_ref" in checks:
255 findings.extend(_check_broken_refs(conn, f_params, now))
257 if "namespacing" in checks:
258 findings.extend(_check_namespacing(conn, f_params, now))
260 return {
261 "findings": findings,
262 "checked_at": now,
263 "scope": scope,
264 "checks_run": checks,
265 "fact_count": fact_count,
266 }
269def _lint_job_worker(job_id: str, req: LintRequest) -> None:
270 """Background task: run lint sweep and update job status."""
271 mark_running(job_id)
272 try:
273 result = _run_lint_sweep(
274 scope=req.scope,
275 checks=req.checks or ALL_CHECKS,
276 entity=req.entity,
277 relation=req.relation,
278 stale_lookahead_s=req.stale_lookahead_s,
279 )
280 mark_done(job_id, result)
281 except Exception as exc:
282 mark_failed(job_id, str(exc))
285@router.post("/v1/lint")
286def lint_scope(
287 req: LintRequest,
288 background_tasks: BackgroundTasks,
289 identity: Annotated[Identity, Depends(resolve_identity)],
290) -> Any:
291 """Health-check sweep for a scope (Spec-20-Lint-Semantics). Read-only.
293 Returns 200 with results synchronously for scopes ≤ threshold facts.
294 Returns 202 with job_id for larger scopes; poll GET /v1/lint/jobs/:job_id.
295 """
296 if not identity.can_read(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise HTTPException(status_code=403, detail="read permission required")
298 if req.scope not in VALID_SCOPES:
299 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}")
301 checks_to_run: list[LintCheck] = req.checks if req.checks else ALL_CHECKS
303 # Count scope facts to choose sync vs. async path (spec §14.5).
304 with db() as conn:
305 scope_count: int = conn.execute(
306 "SELECT COUNT(*) FROM facts WHERE scope = ?", [req.scope]
307 ).fetchone()[0]
309 if scope_count > settings.async_job_threshold:
310 estimated_s = max(10, scope_count // 5_000)
311 job_id = create_job("lint", req.scope, estimated_s)
312 background_tasks.add_task(_lint_job_worker, job_id, req)
313 return JSONResponse(
314 status_code=202,
315 content={"job_id": job_id, "status": "pending", "estimated_s": estimated_s},
316 )
318 result = _run_lint_sweep(
319 req.scope,
320 checks_to_run,
321 req.entity,
322 req.relation,
323 req.stale_lookahead_s,
324 )
325 return LintResult(
326 findings=[LintFinding(**f) for f in result["findings"]],
327 checked_at=result["checked_at"],
328 scope=result["scope"],
329 checks_run=result["checks_run"],
330 fact_count=result["fact_count"],
331 )
334@router.get("/v1/lint/jobs/{job_id}")
335def get_lint_job(
336 job_id: str,
337 identity: Annotated[Identity, Depends(resolve_identity)],
338) -> Any:
339 """Poll the status of an async lint job (Spec-20-Lint-Semantics)."""
340 if not identity.can_read(): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 raise HTTPException(status_code=403, detail="read permission required")
342 job = get_job(job_id, job_type="lint")
343 if job is None:
344 raise HTTPException(status_code=404, detail="job not found")
345 return job