Coverage for node / src / stigmem_node / routes / audit.py: 89%
72 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Track C / C3 — fact audit log surface.
3Exposes the fact_audit_log table for end-to-end identity verification:
4 principal (entity_uri, oidc_sub)
5 → attested source (attested_key_id → agent_keys.entity_uri)
6 → fact_id (facts.entity, relation, value, scope)
8GET /v1/audit/facts/{fact_id} — audit trail for a single fact (enriched join)
9GET /v1/audit — paginated enriched audit log with optional filters
10GET /v1/audit/export — compliance CSV export (all join fields)
11"""
13from __future__ import annotations
15import csv
16import io
17from typing import Annotated, Any
19from fastapi import APIRouter, Depends, HTTPException, Query, status
20from fastapi.responses import StreamingResponse
22from ..auth import Identity, resolve_identity
23from ..db import db
24from ..models.audit import AuditLogEntry, AuditLogResponse
26router = APIRouter(prefix="/v1/audit", tags=["audit"])
28# Full SELECT joining principal → attested key → fact in a single pass.
29_JOIN_SELECT = """
30 SELECT
31 al.id,
32 al.fact_id,
33 al.event_type,
34 al.entity_uri,
35 al.oidc_sub,
36 al.source,
37 al.attested_key_id,
38 al.ts,
39 ak.entity_uri AS attested_key_entity_uri,
40 ak.description AS attested_key_description,
41 f.entity AS fact_entity,
42 f.relation AS fact_relation,
43 f.value_type AS fact_value_type,
44 f.value_v AS fact_value_v,
45 f.scope AS fact_scope
46 FROM fact_audit_log al
47 LEFT JOIN agent_keys ak ON al.attested_key_id = ak.id
48 LEFT JOIN facts f ON al.fact_id = f.id
49"""
51_CSV_HEADERS = [
52 "id",
53 "fact_id",
54 "event_type",
55 "principal_entity_uri",
56 "principal_oidc_sub",
57 "source",
58 "attested_key_id",
59 "attested_key_entity_uri",
60 "attested_key_description",
61 "fact_entity",
62 "fact_relation",
63 "fact_value_type",
64 "fact_value_v",
65 "fact_scope",
66 "ts",
67]
69# Static WHERE clause — all filters are nullable bind parameters so there is
70# no dynamic SQL string construction and no SQL-injection taint path.
71# Parameter order matches _filter_params() exactly.
72_STATIC_WHERE = (
73 " WHERE al.tenant_id = ?"
74 " AND (? IS NULL OR al.entity_uri = ?)"
75 " AND (? IS NULL OR al.oidc_sub = ?)"
76 " AND (? IS NULL OR al.source = ?)"
77 " AND (? IS NULL OR al.fact_id = ?)"
78 " AND (? IS NULL"
79 " OR (? = 1 AND al.attested_key_id IS NOT NULL)"
80 " OR (? = 0 AND al.attested_key_id IS NULL))"
81 " AND (? IS NULL OR al.ts < ? OR (al.ts = ? AND al.id < ?))"
82)
85def _row_to_entry(row: Any) -> AuditLogEntry:
86 return AuditLogEntry(
87 id=row["id"],
88 fact_id=row["fact_id"],
89 event_type=row["event_type"],
90 entity_uri=row["entity_uri"],
91 oidc_sub=row["oidc_sub"],
92 source=row["source"],
93 attested_key_id=row["attested_key_id"],
94 ts=row["ts"],
95 attested_key_entity_uri=row["attested_key_entity_uri"],
96 attested_key_description=row["attested_key_description"],
97 fact_entity=row["fact_entity"],
98 fact_relation=row["fact_relation"],
99 fact_value_type=row["fact_value_type"],
100 fact_value_v=row["fact_value_v"],
101 fact_scope=row["fact_scope"],
102 )
105def _encode_cursor(ts: str, entry_id: str) -> str:
106 """Encode a keyset cursor as '{ts}|{id}' for DESC pagination."""
107 return f"{ts}|{entry_id}"
110def _decode_cursor(cursor: str) -> tuple[str, str] | None:
111 """Return (ts, id) from a cursor string, or None if malformed."""
112 parts = cursor.split("|", 1)
113 if len(parts) != 2: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 return None
115 return parts[0], parts[1]
118def _filter_params(
119 tenant_id: str,
120 entity_uri: str | None,
121 oidc_sub: str | None,
122 source: str | None,
123 fact_id: str | None,
124 attested: bool | None,
125 cursor: str | None,
126) -> list[Any]:
127 """Return bind parameters matching _STATIC_WHERE (no dynamic SQL construction)."""
128 attested_val: int | None = None if attested is None else (1 if attested else 0)
129 cur_ts: str | None = None
130 cur_id: str | None = None
131 if cursor:
132 decoded = _decode_cursor(cursor)
133 if decoded: 133 ↛ 135line 133 didn't jump to line 135 because the condition on line 133 was always true
134 cur_ts, cur_id = decoded
135 return [
136 tenant_id,
137 entity_uri,
138 entity_uri,
139 oidc_sub,
140 oidc_sub,
141 source,
142 source,
143 fact_id,
144 fact_id,
145 attested_val,
146 attested_val,
147 attested_val,
148 cur_ts,
149 cur_ts,
150 cur_ts,
151 cur_id,
152 ]
155@router.get("/facts/{fact_id}", response_model=list[AuditLogEntry])
156def get_fact_audit(
157 fact_id: str,
158 identity: Annotated[Identity, Depends(resolve_identity)],
159) -> list[AuditLogEntry]:
160 """Return the complete enriched audit trail for a specific fact."""
161 if not identity.can_read(): 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 raise HTTPException(
163 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
164 )
166 with db() as conn:
167 rows = conn.execute(
168 _JOIN_SELECT + " WHERE al.fact_id = ? AND al.tenant_id = ? ORDER BY al.ts ASC",
169 (fact_id, identity.tenant_id),
170 ).fetchall()
172 if not rows:
173 with db() as conn:
174 exists = conn.execute(
175 "SELECT id FROM facts WHERE id = ? AND tenant_id = ?",
176 (fact_id, identity.tenant_id),
177 ).fetchone()
178 if exists is None: 178 ↛ 181line 178 didn't jump to line 181 because the condition on line 178 was always true
179 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="fact not found")
181 return [_row_to_entry(r) for r in rows]
184@router.get("/export")
185def export_audit_csv(
186 identity: Annotated[Identity, Depends(resolve_identity)],
187 entity_uri: str | None = Query(None, description="Filter by asserting entity"),
188 oidc_sub: str | None = Query(None, description="Filter by OIDC subject"),
189 source: str | None = Query(None, description="Filter by fact source"),
190 fact_id: str | None = Query(None, description="Filter by fact ID"),
191 attested: bool | None = Query(
192 None, description="true = attested only; false = unattested only"
193 ),
194 limit: int = Query(5000, ge=1, le=50000),
195) -> StreamingResponse:
196 """Export the enriched audit log as CSV for compliance (principal → key → fact)."""
197 if not identity.can_read(): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 raise HTTPException(
199 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
200 )
202 params = _filter_params(
203 identity.tenant_id, entity_uri, oidc_sub, source, fact_id, attested, None
204 )
205 params.append(limit)
206 sql = _JOIN_SELECT + _STATIC_WHERE + " ORDER BY al.ts ASC LIMIT ?"
208 with db() as conn:
209 rows = conn.execute(sql, params).fetchall()
211 buf = io.StringIO()
212 writer = csv.writer(buf)
213 writer.writerow(_CSV_HEADERS)
214 for r in rows:
215 writer.writerow(
216 [
217 r["id"],
218 r["fact_id"],
219 r["event_type"],
220 r["entity_uri"] or "",
221 r["oidc_sub"] or "",
222 r["source"],
223 r["attested_key_id"] or "",
224 r["attested_key_entity_uri"] or "",
225 r["attested_key_description"] or "",
226 r["fact_entity"] or "",
227 r["fact_relation"] or "",
228 r["fact_value_type"] or "",
229 r["fact_value_v"] or "",
230 r["fact_scope"] or "",
231 r["ts"],
232 ]
233 )
235 buf.seek(0)
236 return StreamingResponse(
237 iter([buf.read()]),
238 media_type="text/csv",
239 headers={"Content-Disposition": "attachment; filename=stigmem-audit.csv"},
240 )
243@router.get("", response_model=AuditLogResponse)
244def query_audit(
245 identity: Annotated[Identity, Depends(resolve_identity)],
246 entity_uri: str | None = Query(None, description="Filter by asserting entity"),
247 oidc_sub: str | None = Query(None, description="Filter by OIDC subject"),
248 source: str | None = Query(None, description="Filter by fact source"),
249 fact_id: str | None = Query(None, description="Filter by fact ID"),
250 attested: bool | None = Query(
251 None, description="true = attested only; false = unattested only"
252 ),
253 cursor: str | None = Query(None, description="Opaque pagination cursor (audit entry id)"),
254 limit: int = Query(50, ge=1, le=500),
255) -> AuditLogResponse:
256 """Query enriched audit logs with optional principal and source filters."""
257 if not identity.can_read(): 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 raise HTTPException(
259 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
260 )
262 params = _filter_params(
263 identity.tenant_id, entity_uri, oidc_sub, source, fact_id, attested, cursor
264 )
265 params.append(limit + 1)
266 sql = _JOIN_SELECT + _STATIC_WHERE + " ORDER BY al.ts DESC, al.id DESC LIMIT ?"
268 with db() as conn:
269 rows = conn.execute(sql, params).fetchall()
271 has_more = len(rows) > limit
272 rows = rows[:limit]
273 next_cursor = _encode_cursor(rows[-1]["ts"], rows[-1]["id"]) if has_more and rows else None
275 return AuditLogResponse(
276 entries=[_row_to_entry(r) for r in rows],
277 total=len(rows),
278 cursor=next_cursor,
279 )