Coverage for node / src / stigmem_node / routes / cid_admin.py: 90%
18 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""CID backfill admin routes — spec §25.6.3, §25.7.2."""
3from __future__ import annotations
5import logging
6from typing import Annotated
8from fastapi import APIRouter, Depends, HTTPException, status
10from ..auth import Identity, resolve_identity
11from ..db import db
12from ..models.admin import CidBackfillStatus
14logger = logging.getLogger("stigmem.cid_admin")
16router = APIRouter(prefix="/v1/admin", tags=["admin", "cid"])
19@router.get("/cid-backfill/status", response_model=CidBackfillStatus)
20def cid_backfill_status(
21 identity: Annotated[Identity, Depends(resolve_identity)],
22) -> CidBackfillStatus:
23 """Return CID backfill progress for this node (Spec-21-Content-Addressed-IDs)."""
24 if not identity.can_read(): 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true
25 raise HTTPException(
26 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
27 )
28 with db() as conn:
29 total = conn.execute("SELECT COUNT(*) FROM facts").fetchone()[0]
30 backfilled = conn.execute("SELECT COUNT(*) FROM facts WHERE cid IS NOT NULL").fetchone()[0]
31 pending = total - backfilled
32 return CidBackfillStatus(
33 total_facts=total,
34 backfilled_facts=backfilled,
35 pending_facts=pending,
36 backfill_complete=(pending == 0),
37 )