Coverage for node / src / stigmem_node / routes / decay.py: 89%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Decay sweeper HTTP route — Phase 6 (spec §15) + async job path (spec §15.4). 

2 

3POST /v1/decay/sweep 

4 → 200 sync result, or 202 { job_id, status, estimated_s } when scope > threshold. 

5 

6GET /v1/decay/jobs/:job_id 

7 → 200 job status/result, or 404 if not found. 

8""" 

9 

10from __future__ import annotations 

11 

12from typing import Annotated, Any 

13 

14from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query 

15from fastapi.responses import JSONResponse 

16 

17from ..auth import Identity, resolve_identity 

18from ..db import db 

19from ..jobs import create_job, get_job, mark_done, mark_failed, mark_running 

20from ..lifecycle.decay import run_decay_sweep 

21from ..models.constants import VALID_SCOPES 

22from ..settings import settings 

23 

24router = APIRouter(prefix="/v1/decay", tags=["decay"]) 

25 

26 

27def _decay_job_worker( 

28 job_id: str, 

29 ttl_seconds: int | None, 

30 min_confidence: float | None, 

31 scope: str | None, 

32 dry_run: bool, 

33) -> None: 

34 """Background task: run decay sweep and update job status.""" 

35 mark_running(job_id) 

36 try: 

37 result = run_decay_sweep( 

38 ttl_seconds=ttl_seconds, 

39 min_confidence=min_confidence, 

40 scope=scope, 

41 dry_run=dry_run, 

42 ) 

43 mark_done(job_id, result) 

44 except Exception as exc: 

45 mark_failed(job_id, str(exc)) 

46 

47 

48@router.post("/sweep") 

49def decay_sweep( 

50 background_tasks: BackgroundTasks, 

51 identity: Annotated[Identity, Depends(resolve_identity)], 

52 dry_run: bool = Query(False, description="Report what would be decayed without writing"), 

53 scope: str | None = Query(None, description="Restrict sweep to one scope"), 

54 ttl_seconds: int | None = Query( 

55 None, ge=0, description="Expire non-expiring facts older than N seconds (0 = all)" 

56 ), 

57 min_confidence: float | None = Query( 

58 None, ge=0.0, le=1.0, description="Expire active facts below this confidence" 

59 ), 

60) -> Any: 

61 """Mark stale facts as expired. Cron-friendly one-shot sweeper. 

62 

63 Returns 200 synchronously for scopes ≤ threshold facts (Spec-X9-Decay-Semantics). 

64 Returns 202 with job_id for larger scopes; poll GET /v1/decay/jobs/:job_id. 

65 Note: dry_run is always synchronous per Spec-X9-Decay-Semantics. 

66 """ 

67 if not identity.can_write(): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 raise HTTPException(status_code=403, detail="write permission required") 

69 if scope is not None and scope not in VALID_SCOPES: 

70 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}") 

71 

72 # Dry-run is always synchronous (spec §15.4). 

73 if not dry_run: 

74 with db() as conn: 

75 if scope is not None: 

76 scope_count: int = conn.execute( 

77 "SELECT COUNT(*) FROM facts WHERE scope = ?", [scope] 

78 ).fetchone()[0] 

79 else: 

80 scope_count = conn.execute("SELECT COUNT(*) FROM facts").fetchone()[0] 

81 

82 if scope_count > settings.async_job_threshold: 

83 estimated_s = max(60, scope_count // 1_000) 

84 job_id = create_job("decay", scope, estimated_s) 

85 background_tasks.add_task( 

86 _decay_job_worker, job_id, ttl_seconds, min_confidence, scope, dry_run 

87 ) 

88 return JSONResponse( 

89 status_code=202, 

90 content={"job_id": job_id, "status": "pending", "estimated_s": estimated_s}, 

91 ) 

92 

93 return run_decay_sweep( 

94 ttl_seconds=ttl_seconds, 

95 min_confidence=min_confidence, 

96 scope=scope, 

97 dry_run=dry_run, 

98 ) 

99 

100 

101@router.get("/jobs/{job_id}") 

102def get_decay_job( 

103 job_id: str, 

104 identity: Annotated[Identity, Depends(resolve_identity)], 

105) -> Any: 

106 """Poll the status of an async decay job (Spec-X9-Decay-Semantics).""" 

107 if not identity.can_read(): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 raise HTTPException(status_code=403, detail="read permission required") 

109 job = get_job(job_id, job_type="decay") 

110 if job is None: 

111 raise HTTPException(status_code=404, detail="job not found") 

112 return job