Coverage for node / src / stigmem_node / routes / cid_admin.py: 90%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""CID backfill admin routes — spec §25.6.3, §25.7.2.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Annotated 

7 

8from fastapi import APIRouter, Depends, HTTPException, status 

9 

10from ..auth import Identity, resolve_identity 

11from ..db import db 

12from ..models.admin import CidBackfillStatus 

13 

14logger = logging.getLogger("stigmem.cid_admin") 

15 

16router = APIRouter(prefix="/v1/admin", tags=["admin", "cid"]) 

17 

18 

19@router.get("/cid-backfill/status", response_model=CidBackfillStatus) 

20def cid_backfill_status( 

21 identity: Annotated[Identity, Depends(resolve_identity)], 

22) -> CidBackfillStatus: 

23 """Return CID backfill progress for this node (Spec-21-Content-Addressed-IDs).""" 

24 if not identity.can_read(): 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 raise HTTPException( 

26 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

27 ) 

28 with db() as conn: 

29 total = conn.execute("SELECT COUNT(*) FROM facts").fetchone()[0] 

30 backfilled = conn.execute("SELECT COUNT(*) FROM facts WHERE cid IS NOT NULL").fetchone()[0] 

31 pending = total - backfilled 

32 return CidBackfillStatus( 

33 total_facts=total, 

34 backfilled_facts=backfilled, 

35 pending_facts=pending, 

36 backfill_complete=(pending == 0), 

37 )