Coverage for node / src / stigmem_node / routes / audit.py: 89%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Track C / C3 — fact audit log surface. 

2 

3Exposes the fact_audit_log table for end-to-end identity verification: 

4 principal (entity_uri, oidc_sub) 

5 → attested source (attested_key_id → agent_keys.entity_uri) 

6 → fact_id (facts.entity, relation, value, scope) 

7 

8GET /v1/audit/facts/{fact_id} — audit trail for a single fact (enriched join) 

9GET /v1/audit — paginated enriched audit log with optional filters 

10GET /v1/audit/export — compliance CSV export (all join fields) 

11""" 

12 

13from __future__ import annotations 

14 

15import csv 

16import io 

17from typing import Annotated, Any 

18 

19from fastapi import APIRouter, Depends, HTTPException, Query, status 

20from fastapi.responses import StreamingResponse 

21 

22from ..auth import Identity, resolve_identity 

23from ..db import db 

24from ..models.audit import AuditLogEntry, AuditLogResponse 

25 

26router = APIRouter(prefix="/v1/audit", tags=["audit"]) 

27 

28# Full SELECT joining principal → attested key → fact in a single pass. 

29_JOIN_SELECT = """ 

30 SELECT 

31 al.id, 

32 al.fact_id, 

33 al.event_type, 

34 al.entity_uri, 

35 al.oidc_sub, 

36 al.source, 

37 al.attested_key_id, 

38 al.ts, 

39 ak.entity_uri AS attested_key_entity_uri, 

40 ak.description AS attested_key_description, 

41 f.entity AS fact_entity, 

42 f.relation AS fact_relation, 

43 f.value_type AS fact_value_type, 

44 f.value_v AS fact_value_v, 

45 f.scope AS fact_scope 

46 FROM fact_audit_log al 

47 LEFT JOIN agent_keys ak ON al.attested_key_id = ak.id 

48 LEFT JOIN facts f ON al.fact_id = f.id 

49""" 

50 

51_CSV_HEADERS = [ 

52 "id", 

53 "fact_id", 

54 "event_type", 

55 "principal_entity_uri", 

56 "principal_oidc_sub", 

57 "source", 

58 "attested_key_id", 

59 "attested_key_entity_uri", 

60 "attested_key_description", 

61 "fact_entity", 

62 "fact_relation", 

63 "fact_value_type", 

64 "fact_value_v", 

65 "fact_scope", 

66 "ts", 

67] 

68 

69# Static WHERE clause — all filters are nullable bind parameters so there is 

70# no dynamic SQL string construction and no SQL-injection taint path. 

71# Parameter order matches _filter_params() exactly. 

72_STATIC_WHERE = ( 

73 " WHERE al.tenant_id = ?" 

74 " AND (? IS NULL OR al.entity_uri = ?)" 

75 " AND (? IS NULL OR al.oidc_sub = ?)" 

76 " AND (? IS NULL OR al.source = ?)" 

77 " AND (? IS NULL OR al.fact_id = ?)" 

78 " AND (? IS NULL" 

79 " OR (? = 1 AND al.attested_key_id IS NOT NULL)" 

80 " OR (? = 0 AND al.attested_key_id IS NULL))" 

81 " AND (? IS NULL OR al.ts < ? OR (al.ts = ? AND al.id < ?))" 

82) 

83 

84 

85def _row_to_entry(row: Any) -> AuditLogEntry: 

86 return AuditLogEntry( 

87 id=row["id"], 

88 fact_id=row["fact_id"], 

89 event_type=row["event_type"], 

90 entity_uri=row["entity_uri"], 

91 oidc_sub=row["oidc_sub"], 

92 source=row["source"], 

93 attested_key_id=row["attested_key_id"], 

94 ts=row["ts"], 

95 attested_key_entity_uri=row["attested_key_entity_uri"], 

96 attested_key_description=row["attested_key_description"], 

97 fact_entity=row["fact_entity"], 

98 fact_relation=row["fact_relation"], 

99 fact_value_type=row["fact_value_type"], 

100 fact_value_v=row["fact_value_v"], 

101 fact_scope=row["fact_scope"], 

102 ) 

103 

104 

105def _encode_cursor(ts: str, entry_id: str) -> str: 

106 """Encode a keyset cursor as '{ts}|{id}' for DESC pagination.""" 

107 return f"{ts}|{entry_id}" 

108 

109 

110def _decode_cursor(cursor: str) -> tuple[str, str] | None: 

111 """Return (ts, id) from a cursor string, or None if malformed.""" 

112 parts = cursor.split("|", 1) 

113 if len(parts) != 2: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 return None 

115 return parts[0], parts[1] 

116 

117 

118def _filter_params( 

119 tenant_id: str, 

120 entity_uri: str | None, 

121 oidc_sub: str | None, 

122 source: str | None, 

123 fact_id: str | None, 

124 attested: bool | None, 

125 cursor: str | None, 

126) -> list[Any]: 

127 """Return bind parameters matching _STATIC_WHERE (no dynamic SQL construction).""" 

128 attested_val: int | None = None if attested is None else (1 if attested else 0) 

129 cur_ts: str | None = None 

130 cur_id: str | None = None 

131 if cursor: 

132 decoded = _decode_cursor(cursor) 

133 if decoded: 133 ↛ 135line 133 didn't jump to line 135 because the condition on line 133 was always true

134 cur_ts, cur_id = decoded 

135 return [ 

136 tenant_id, 

137 entity_uri, 

138 entity_uri, 

139 oidc_sub, 

140 oidc_sub, 

141 source, 

142 source, 

143 fact_id, 

144 fact_id, 

145 attested_val, 

146 attested_val, 

147 attested_val, 

148 cur_ts, 

149 cur_ts, 

150 cur_ts, 

151 cur_id, 

152 ] 

153 

154 

155@router.get("/facts/{fact_id}", response_model=list[AuditLogEntry]) 

156def get_fact_audit( 

157 fact_id: str, 

158 identity: Annotated[Identity, Depends(resolve_identity)], 

159) -> list[AuditLogEntry]: 

160 """Return the complete enriched audit trail for a specific fact.""" 

161 if not identity.can_read(): 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 raise HTTPException( 

163 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

164 ) 

165 

166 with db() as conn: 

167 rows = conn.execute( 

168 _JOIN_SELECT + " WHERE al.fact_id = ? AND al.tenant_id = ? ORDER BY al.ts ASC", 

169 (fact_id, identity.tenant_id), 

170 ).fetchall() 

171 

172 if not rows: 

173 with db() as conn: 

174 exists = conn.execute( 

175 "SELECT id FROM facts WHERE id = ? AND tenant_id = ?", 

176 (fact_id, identity.tenant_id), 

177 ).fetchone() 

178 if exists is None: 178 ↛ 181line 178 didn't jump to line 181 because the condition on line 178 was always true

179 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="fact not found") 

180 

181 return [_row_to_entry(r) for r in rows] 

182 

183 

184@router.get("/export") 

185def export_audit_csv( 

186 identity: Annotated[Identity, Depends(resolve_identity)], 

187 entity_uri: str | None = Query(None, description="Filter by asserting entity"), 

188 oidc_sub: str | None = Query(None, description="Filter by OIDC subject"), 

189 source: str | None = Query(None, description="Filter by fact source"), 

190 fact_id: str | None = Query(None, description="Filter by fact ID"), 

191 attested: bool | None = Query( 

192 None, description="true = attested only; false = unattested only" 

193 ), 

194 limit: int = Query(5000, ge=1, le=50000), 

195) -> StreamingResponse: 

196 """Export the enriched audit log as CSV for compliance (principal → key → fact).""" 

197 if not identity.can_read(): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 raise HTTPException( 

199 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

200 ) 

201 

202 params = _filter_params( 

203 identity.tenant_id, entity_uri, oidc_sub, source, fact_id, attested, None 

204 ) 

205 params.append(limit) 

206 sql = _JOIN_SELECT + _STATIC_WHERE + " ORDER BY al.ts ASC LIMIT ?" 

207 

208 with db() as conn: 

209 rows = conn.execute(sql, params).fetchall() 

210 

211 buf = io.StringIO() 

212 writer = csv.writer(buf) 

213 writer.writerow(_CSV_HEADERS) 

214 for r in rows: 

215 writer.writerow( 

216 [ 

217 r["id"], 

218 r["fact_id"], 

219 r["event_type"], 

220 r["entity_uri"] or "", 

221 r["oidc_sub"] or "", 

222 r["source"], 

223 r["attested_key_id"] or "", 

224 r["attested_key_entity_uri"] or "", 

225 r["attested_key_description"] or "", 

226 r["fact_entity"] or "", 

227 r["fact_relation"] or "", 

228 r["fact_value_type"] or "", 

229 r["fact_value_v"] or "", 

230 r["fact_scope"] or "", 

231 r["ts"], 

232 ] 

233 ) 

234 

235 buf.seek(0) 

236 return StreamingResponse( 

237 iter([buf.read()]), 

238 media_type="text/csv", 

239 headers={"Content-Disposition": "attachment; filename=stigmem-audit.csv"}, 

240 ) 

241 

242 

243@router.get("", response_model=AuditLogResponse) 

244def query_audit( 

245 identity: Annotated[Identity, Depends(resolve_identity)], 

246 entity_uri: str | None = Query(None, description="Filter by asserting entity"), 

247 oidc_sub: str | None = Query(None, description="Filter by OIDC subject"), 

248 source: str | None = Query(None, description="Filter by fact source"), 

249 fact_id: str | None = Query(None, description="Filter by fact ID"), 

250 attested: bool | None = Query( 

251 None, description="true = attested only; false = unattested only" 

252 ), 

253 cursor: str | None = Query(None, description="Opaque pagination cursor (audit entry id)"), 

254 limit: int = Query(50, ge=1, le=500), 

255) -> AuditLogResponse: 

256 """Query enriched audit logs with optional principal and source filters.""" 

257 if not identity.can_read(): 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 raise HTTPException( 

259 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

260 ) 

261 

262 params = _filter_params( 

263 identity.tenant_id, entity_uri, oidc_sub, source, fact_id, attested, cursor 

264 ) 

265 params.append(limit + 1) 

266 sql = _JOIN_SELECT + _STATIC_WHERE + " ORDER BY al.ts DESC, al.id DESC LIMIT ?" 

267 

268 with db() as conn: 

269 rows = conn.execute(sql, params).fetchall() 

270 

271 has_more = len(rows) > limit 

272 rows = rows[:limit] 

273 next_cursor = _encode_cursor(rows[-1]["ts"], rows[-1]["id"]) if has_more and rows else None 

274 

275 return AuditLogResponse( 

276 entries=[_row_to_entry(r) for r in rows], 

277 total=len(rows), 

278 cursor=next_cursor, 

279 )