Coverage for node / src / stigmem_node / routes / lint.py: 86%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Lint route — spec §14 (v0.7) + async job path (spec §14.5). 

2 

3POST /v1/lint 

4{ scope, checks?, entity?, relation?, stale_lookahead_s? } 

5 → 200 sync result, or 202 { job_id, status, estimated_s } when scope > threshold. 

6 

7GET /v1/lint/jobs/:job_id 

8 → 200 job status/result, or 404 if not found. 

9""" 

10 

11from __future__ import annotations 

12 

13from datetime import UTC, datetime, timedelta 

14from typing import Annotated, Any 

15 

16from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException 

17from fastapi.responses import JSONResponse 

18 

19from ..auth import Identity, resolve_identity 

20from ..db import db 

21from ..jobs import create_job, get_job, mark_done, mark_failed, mark_running 

22from ..models.constants import VALID_SCOPES 

23from ..models.lint import ALL_CHECKS, LintCheck, LintFinding, LintRequest, LintResult 

24from ..settings import settings 

25 

26router = APIRouter(tags=["lint"]) 

27 

28INTENT_ROUTING_RELATIONS = frozenset({"intent:handoff_to", "intent:context_ref"}) 

29 

30# Constant WHERE-fragment tails for the lint queries. Optional filters are 

31# gated via ``(? IS NULL OR …)`` so the SQL strings are module-level constants 

32# — no user input ever flows into the query text. Closes the 

33# ``py/sql-injection`` taint that the previous conditional-fragment builder 

34# triggered (issue #115). 

35_COUNT_SQL = ( 

36 "SELECT COUNT(*) FROM facts f" 

37 " WHERE 1=1" 

38 " AND f.scope = ?" 

39 " AND (? IS NULL OR f.entity = ?)" 

40 " AND (? IS NULL OR f.relation = ?)" 

41) 

42 

43 

44def _lint_filter_params(scope: str, entity: str | None, relation: str | None) -> list[Any]: 

45 """Return the bind values for ``_F_FILTER_TAIL`` / ``_FA_FILTER_TAIL``. 

46 

47 Empty-string entity/relation are normalized to None so the IS-NULL gate 

48 preserves the previous ``if entity:`` truthiness behaviour. 

49 """ 

50 entity_p = entity or None 

51 relation_p = relation or None 

52 return [scope, entity_p, entity_p, relation_p, relation_p] 

53 

54 

55_CONFLICT_SQL = ( 

56 "SELECT c.id AS conflict_id, c.fact_a_id, c.fact_b_id, fa.entity, fa.relation" 

57 " FROM conflicts c" 

58 " JOIN facts fa ON fa.id = c.fact_a_id" 

59 " JOIN facts fb ON fb.id = c.fact_b_id" 

60 " WHERE c.status = 'unresolved'" 

61 " AND fa.scope = ?" 

62 " AND (? IS NULL OR fa.entity = ?)" 

63 " AND (? IS NULL OR fa.relation = ?)" 

64) 

65 

66 

67def _check_contradictions(conn: Any, fa_params: list[Any]) -> list[dict[str, Any]]: 

68 """Return contradiction findings for unresolved conflicts in the filtered scope.""" 

69 findings: list[dict[str, Any]] = [] 

70 for row in conn.execute(_CONFLICT_SQL, fa_params).fetchall(): 

71 findings.append( 

72 { 

73 "check": "contradiction", 

74 "severity": "error", 

75 "entity": row["entity"], 

76 "relation": row["relation"], 

77 "fact_ids": [row["fact_a_id"], row["fact_b_id"]], 

78 "detail": f"unresolved conflict {row['conflict_id']}", 

79 } 

80 ) 

81 return findings 

82 

83 

84_STALE_SQL = ( 

85 "SELECT f.id, f.entity, f.relation, f.valid_until" 

86 " FROM facts f" 

87 " WHERE f.valid_until IS NOT NULL" 

88 " AND f.confidence > 0.0" 

89 " AND f.valid_until <= ?" 

90 " AND f.scope = ?" 

91 " AND (? IS NULL OR f.entity = ?)" 

92 " AND (? IS NULL OR f.relation = ?)" 

93) 

94 

95 

96def _check_stale( 

97 conn: Any, 

98 f_params: list[Any], 

99 now: str, 

100 lookahead: str, 

101 stale_lookahead_s: int, 

102) -> list[dict[str, Any]]: 

103 """Return stale (already-expired or expiring-soon) findings.""" 

104 findings: list[dict[str, Any]] = [] 

105 for row in conn.execute(_STALE_SQL, [lookahead] + f_params).fetchall(): 105 ↛ 106line 105 didn't jump to line 106 because the loop on line 105 never started

106 expired = row["valid_until"] <= now 

107 findings.append( 

108 { 

109 "check": "stale", 

110 "severity": "warning" if expired else "info", 

111 "entity": row["entity"], 

112 "relation": row["relation"], 

113 "fact_ids": [row["id"]], 

114 "detail": ( 

115 f"expired at {row['valid_until']}" 

116 if expired 

117 else f"expires at {row['valid_until']} (within {stale_lookahead_s}s)" 

118 ), 

119 } 

120 ) 

121 return findings 

122 

123 

124_ORPHAN_SQL = ( 

125 "SELECT entity FROM facts" 

126 " WHERE scope = ?" 

127 " AND (? IS NULL OR entity = ?)" 

128 " GROUP BY entity" 

129 " HAVING COUNT(*) > 0" 

130 " AND SUM(CASE WHEN confidence > 0.0" 

131 " AND (valid_until IS NULL OR valid_until > ?) THEN 1 ELSE 0 END) = 0" 

132) 

133 

134 

135def _check_orphans(conn: Any, scope: str, entity: str | None, now: str) -> list[dict[str, Any]]: 

136 """Return orphan-entity findings (entities with no live facts in scope).""" 

137 findings: list[dict[str, Any]] = [] 

138 entity_p = entity or None 

139 for row in conn.execute(_ORPHAN_SQL, [scope, entity_p, entity_p, now]).fetchall(): 139 ↛ 140line 139 didn't jump to line 140 because the loop on line 139 never started

140 findings.append( 

141 { 

142 "check": "orphan", 

143 "severity": "info", 

144 "entity": row["entity"], 

145 "relation": None, 

146 "fact_ids": [], 

147 "detail": f"entity {row['entity']!r} has no live facts in scope={scope}", 

148 } 

149 ) 

150 return findings 

151 

152 

153_REF_SQL = ( 

154 "SELECT f.id, f.entity, f.relation, f.value_v" 

155 " FROM facts f" 

156 " WHERE f.value_type = 'ref'" 

157 " AND f.confidence > 0.0" 

158 " AND (f.valid_until IS NULL OR f.valid_until > ?)" 

159 " AND f.scope = ?" 

160 " AND (? IS NULL OR f.entity = ?)" 

161 " AND (? IS NULL OR f.relation = ?)" 

162) 

163 

164 

165def _check_broken_refs(conn: Any, f_params: list[Any], now: str) -> list[dict[str, Any]]: 

166 """Return broken-ref findings for value-type=ref facts whose target has no live facts.""" 

167 findings: list[dict[str, Any]] = [] 

168 for row in conn.execute(_REF_SQL, [now] + f_params).fetchall(): 168 ↛ 169line 168 didn't jump to line 169 because the loop on line 168 never started

169 target_entity = row["value_v"] 

170 live_count = conn.execute( 

171 "SELECT COUNT(*) FROM facts" 

172 " WHERE entity = ? AND confidence > 0.0" 

173 " AND (valid_until IS NULL OR valid_until > ?)", 

174 [target_entity, now], 

175 ).fetchone()[0] 

176 if live_count == 0: 

177 is_intent = row["relation"] in INTENT_ROUTING_RELATIONS 

178 findings.append( 

179 { 

180 "check": "broken_ref", 

181 "severity": "error" if is_intent else "warning", 

182 "entity": row["entity"], 

183 "relation": row["relation"], 

184 "fact_ids": [row["id"]], 

185 "detail": f"ref target entity {target_entity!r} has no live facts", 

186 } 

187 ) 

188 return findings 

189 

190 

191_NS_SQL = ( 

192 "SELECT f.entity, f.relation, GROUP_CONCAT(f.id) AS ids" 

193 " FROM facts f" 

194 " WHERE f.confidence > 0.0" 

195 " AND (f.valid_until IS NULL OR f.valid_until > ?)" 

196 " AND instr(f.relation, ':') = 0" 

197 " AND f.scope = ?" 

198 " AND (? IS NULL OR f.entity = ?)" 

199 " AND (? IS NULL OR f.relation = ?)" 

200 " GROUP BY f.entity, f.relation" 

201) 

202 

203 

204def _check_namespacing(conn: Any, f_params: list[Any], now: str) -> list[dict[str, Any]]: 

205 """Return namespacing findings for live facts whose relation lacks a 'prefix:' namespace.""" 

206 findings: list[dict[str, Any]] = [] 

207 for row in conn.execute(_NS_SQL, [now] + f_params).fetchall(): 

208 findings.append( 

209 { 

210 "check": "namespacing", 

211 "severity": "warning", 

212 "entity": row["entity"], 

213 "relation": row["relation"], 

214 "fact_ids": row["ids"].split(",") if row["ids"] else [], 

215 "detail": ( 

216 f"bare relation {row['relation']!r} has no namespace prefix — " 

217 f"rename to 'your-prefix:{row['relation']}' to avoid silent collisions" 

218 ), 

219 } 

220 ) 

221 return findings 

222 

223 

224def _run_lint_sweep( 

225 scope: str, 

226 checks: list[LintCheck], 

227 entity: str | None, 

228 relation: str | None, 

229 stale_lookahead_s: int, 

230) -> dict[str, Any]: 

231 """Execute the lint sweep and return a dict matching LintResult fields.""" 

232 now_dt = datetime.now(UTC) 

233 now = now_dt.isoformat() 

234 lookahead = (now_dt + timedelta(seconds=stale_lookahead_s)).isoformat() 

235 

236 f_params = _lint_filter_params(scope, entity, relation) 

237 fa_params = _lint_filter_params(scope, entity, relation) 

238 

239 findings: list[dict[str, Any]] = [] 

240 fact_count = 0 

241 

242 with db() as conn: 

243 fact_count = conn.execute(_COUNT_SQL, f_params).fetchone()[0] 

244 

245 if "contradiction" in checks: 

246 findings.extend(_check_contradictions(conn, fa_params)) 

247 

248 if "stale" in checks: 

249 findings.extend(_check_stale(conn, f_params, now, lookahead, stale_lookahead_s)) 

250 

251 if "orphan" in checks: 

252 findings.extend(_check_orphans(conn, scope, entity, now)) 

253 

254 if "broken_ref" in checks: 

255 findings.extend(_check_broken_refs(conn, f_params, now)) 

256 

257 if "namespacing" in checks: 

258 findings.extend(_check_namespacing(conn, f_params, now)) 

259 

260 return { 

261 "findings": findings, 

262 "checked_at": now, 

263 "scope": scope, 

264 "checks_run": checks, 

265 "fact_count": fact_count, 

266 } 

267 

268 

269def _lint_job_worker(job_id: str, req: LintRequest) -> None: 

270 """Background task: run lint sweep and update job status.""" 

271 mark_running(job_id) 

272 try: 

273 result = _run_lint_sweep( 

274 scope=req.scope, 

275 checks=req.checks or ALL_CHECKS, 

276 entity=req.entity, 

277 relation=req.relation, 

278 stale_lookahead_s=req.stale_lookahead_s, 

279 ) 

280 mark_done(job_id, result) 

281 except Exception as exc: 

282 mark_failed(job_id, str(exc)) 

283 

284 

285@router.post("/v1/lint") 

286def lint_scope( 

287 req: LintRequest, 

288 background_tasks: BackgroundTasks, 

289 identity: Annotated[Identity, Depends(resolve_identity)], 

290) -> Any: 

291 """Health-check sweep for a scope (Spec-20-Lint-Semantics). Read-only. 

292 

293 Returns 200 with results synchronously for scopes ≤ threshold facts. 

294 Returns 202 with job_id for larger scopes; poll GET /v1/lint/jobs/:job_id. 

295 """ 

296 if not identity.can_read(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise HTTPException(status_code=403, detail="read permission required") 

298 if req.scope not in VALID_SCOPES: 

299 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}") 

300 

301 checks_to_run: list[LintCheck] = req.checks if req.checks else ALL_CHECKS 

302 

303 # Count scope facts to choose sync vs. async path (spec §14.5). 

304 with db() as conn: 

305 scope_count: int = conn.execute( 

306 "SELECT COUNT(*) FROM facts WHERE scope = ?", [req.scope] 

307 ).fetchone()[0] 

308 

309 if scope_count > settings.async_job_threshold: 

310 estimated_s = max(10, scope_count // 5_000) 

311 job_id = create_job("lint", req.scope, estimated_s) 

312 background_tasks.add_task(_lint_job_worker, job_id, req) 

313 return JSONResponse( 

314 status_code=202, 

315 content={"job_id": job_id, "status": "pending", "estimated_s": estimated_s}, 

316 ) 

317 

318 result = _run_lint_sweep( 

319 req.scope, 

320 checks_to_run, 

321 req.entity, 

322 req.relation, 

323 req.stale_lookahead_s, 

324 ) 

325 return LintResult( 

326 findings=[LintFinding(**f) for f in result["findings"]], 

327 checked_at=result["checked_at"], 

328 scope=result["scope"], 

329 checks_run=result["checks_run"], 

330 fact_count=result["fact_count"], 

331 ) 

332 

333 

334@router.get("/v1/lint/jobs/{job_id}") 

335def get_lint_job( 

336 job_id: str, 

337 identity: Annotated[Identity, Depends(resolve_identity)], 

338) -> Any: 

339 """Poll the status of an async lint job (Spec-20-Lint-Semantics).""" 

340 if not identity.can_read(): 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 raise HTTPException(status_code=403, detail="read permission required") 

342 job = get_job(job_id, job_type="lint") 

343 if job is None: 

344 raise HTTPException(status_code=404, detail="job not found") 

345 return job