Coverage for node / src / stigmem_node / routes / admin_audit.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Admin audit log export — spec §22.3.
3GET /v1/admin/audit Paginated export of all audit event types across all
4 principals. Gated by the ``audit.read`` capability.
6Query parameters:
7 since RFC3339 timestamp — return events with ts >= since
8 until RFC3339 timestamp — return events with ts <= until
9 principal entity_uri filter
10 event_type filter to one event type (e.g. "quota_breach")
11 cursor opaque seq-based cursor for forward pagination
12 limit page size (1–1000, default 200)
13"""
15from __future__ import annotations
17from typing import Annotated, Any
19from fastapi import APIRouter, Depends, HTTPException, Query, status
21from ..auth import Identity, resolve_identity
22from ..db import db
23from ..models.admin import AdminAuditEntry, AdminAuditResponse
25router = APIRouter(prefix="/v1/admin", tags=["admin"])
28def _row_to_entry(row: Any) -> AdminAuditEntry:
29 return AdminAuditEntry(
30 seq=row["seq"],
31 id=row["id"],
32 event_type=row["event_type"],
33 entity_uri=row["entity_uri"],
34 oidc_sub=row["oidc_sub"],
35 fact_id=row["fact_id"],
36 source=row["source"],
37 attested_key_id=row["attested_key_id"],
38 ts=row["ts"],
39 tenant_id=row["tenant_id"],
40 detail=row["detail"],
41 )
44@router.get("/audit", response_model=AdminAuditResponse)
45def admin_audit_export(
46 identity: Annotated[Identity, Depends(resolve_identity)],
47 since: str | None = Query(None, description="RFC3339 lower bound on ts (inclusive)"),
48 until: str | None = Query(None, description="RFC3339 upper bound on ts (inclusive)"),
49 principal: str | None = Query(None, description="Filter by entity_uri"),
50 event_type: str | None = Query(None, description="Filter by event_type"),
51 cursor: int | None = Query(
52 None, description="Seq-based forward pagination cursor (exclusive lower bound)"
53 ),
54 limit: int = Query(200, ge=1, le=1000),
55) -> AdminAuditResponse:
56 """Export the full audit log. Requires ``audit.read`` capability."""
57 if not identity.can_audit():
58 raise HTTPException(
59 status_code=status.HTTP_403_FORBIDDEN,
60 detail="audit.read capability required",
61 )
63 params: list[Any] = []
64 clauses: list[str] = []
66 clauses.append("tenant_id = ?")
67 params.append(identity.tenant_id)
69 if since:
70 clauses.append("ts >= ?")
71 params.append(since)
72 if until:
73 clauses.append("ts <= ?")
74 params.append(until)
75 if principal:
76 clauses.append("entity_uri = ?")
77 params.append(principal)
78 if event_type:
79 clauses.append("event_type = ?")
80 params.append(event_type)
81 if cursor is not None:
82 clauses.append("(seq > ? OR seq IS NULL)")
83 params.append(cursor)
85 where = "WHERE " + " AND ".join(clauses) if clauses else ""
86 params.append(limit + 1)
88 sql = f"""
89 SELECT seq, id, event_type, entity_uri, oidc_sub, fact_id, source,
90 attested_key_id, ts, tenant_id, detail
91 FROM fact_audit_log
92 {where}
93 ORDER BY seq ASC NULLS LAST, ts ASC, id ASC
94 LIMIT ?
95 """ # noqa: S608 # nosec B608
97 with db() as conn:
98 rows = conn.execute(sql, params).fetchall()
100 has_more = len(rows) > limit
101 rows = rows[:limit]
103 next_cursor: int | None = None
104 if has_more and rows:
105 last_seq = rows[-1]["seq"]
106 next_cursor = last_seq
108 return AdminAuditResponse(
109 entries=[_row_to_entry(r) for r in rows],
110 total=len(rows),
111 next_cursor=next_cursor,
112 )