Coverage for node / src / stigmem_node / routes / admin_audit.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Admin audit log export — spec §22.3. 

2 

3GET /v1/admin/audit Paginated export of all audit event types across all 

4 principals. Gated by the ``audit.read`` capability. 

5 

6Query parameters: 

7 since RFC3339 timestamp — return events with ts >= since 

8 until RFC3339 timestamp — return events with ts <= until 

9 principal entity_uri filter 

10 event_type filter to one event type (e.g. "quota_breach") 

11 cursor opaque seq-based cursor for forward pagination 

12 limit page size (1–1000, default 200) 

13""" 

14 

15from __future__ import annotations 

16 

17from typing import Annotated, Any 

18 

19from fastapi import APIRouter, Depends, HTTPException, Query, status 

20 

21from ..auth import Identity, resolve_identity 

22from ..db import db 

23from ..models.admin import AdminAuditEntry, AdminAuditResponse 

24 

25router = APIRouter(prefix="/v1/admin", tags=["admin"]) 

26 

27 

28def _row_to_entry(row: Any) -> AdminAuditEntry: 

29 return AdminAuditEntry( 

30 seq=row["seq"], 

31 id=row["id"], 

32 event_type=row["event_type"], 

33 entity_uri=row["entity_uri"], 

34 oidc_sub=row["oidc_sub"], 

35 fact_id=row["fact_id"], 

36 source=row["source"], 

37 attested_key_id=row["attested_key_id"], 

38 ts=row["ts"], 

39 tenant_id=row["tenant_id"], 

40 detail=row["detail"], 

41 ) 

42 

43 

44@router.get("/audit", response_model=AdminAuditResponse) 

45def admin_audit_export( 

46 identity: Annotated[Identity, Depends(resolve_identity)], 

47 since: str | None = Query(None, description="RFC3339 lower bound on ts (inclusive)"), 

48 until: str | None = Query(None, description="RFC3339 upper bound on ts (inclusive)"), 

49 principal: str | None = Query(None, description="Filter by entity_uri"), 

50 event_type: str | None = Query(None, description="Filter by event_type"), 

51 cursor: int | None = Query( 

52 None, description="Seq-based forward pagination cursor (exclusive lower bound)" 

53 ), 

54 limit: int = Query(200, ge=1, le=1000), 

55) -> AdminAuditResponse: 

56 """Export the full audit log. Requires ``audit.read`` capability.""" 

57 if not identity.can_audit(): 

58 raise HTTPException( 

59 status_code=status.HTTP_403_FORBIDDEN, 

60 detail="audit.read capability required", 

61 ) 

62 

63 params: list[Any] = [] 

64 clauses: list[str] = [] 

65 

66 clauses.append("tenant_id = ?") 

67 params.append(identity.tenant_id) 

68 

69 if since: 

70 clauses.append("ts >= ?") 

71 params.append(since) 

72 if until: 

73 clauses.append("ts <= ?") 

74 params.append(until) 

75 if principal: 

76 clauses.append("entity_uri = ?") 

77 params.append(principal) 

78 if event_type: 

79 clauses.append("event_type = ?") 

80 params.append(event_type) 

81 if cursor is not None: 

82 clauses.append("(seq > ? OR seq IS NULL)") 

83 params.append(cursor) 

84 

85 where = "WHERE " + " AND ".join(clauses) if clauses else "" 

86 params.append(limit + 1) 

87 

88 sql = f""" 

89 SELECT seq, id, event_type, entity_uri, oidc_sub, fact_id, source, 

90 attested_key_id, ts, tenant_id, detail 

91 FROM fact_audit_log 

92 {where} 

93 ORDER BY seq ASC NULLS LAST, ts ASC, id ASC 

94 LIMIT ? 

95 """ # noqa: S608 # nosec B608 

96 

97 with db() as conn: 

98 rows = conn.execute(sql, params).fetchall() 

99 

100 has_more = len(rows) > limit 

101 rows = rows[:limit] 

102 

103 next_cursor: int | None = None 

104 if has_more and rows: 

105 last_seq = rows[-1]["seq"] 

106 next_cursor = last_seq 

107 

108 return AdminAuditResponse( 

109 entries=[_row_to_entry(r) for r in rows], 

110 total=len(rows), 

111 next_cursor=next_cursor, 

112 )