Coverage for node / src / stigmem_node / observability / audit_event.py: 93%
37 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Centralized audit event emission — spec §22.3.
3All code paths that emit audit events call ``emit()`` here. Write-ahead
4semantics are guaranteed by the caller: callers must call ``emit()`` *before*
5returning the HTTP response (or inside the same DB transaction where
6possible).
8Supported event_type values (spec §22.3.1):
9 fact_write, fact_read, capability_token_issue, capability_token_revoke,
10 manifest_publish, key_rotation, federation_connect, quarantine_admit,
11 quarantine_release, quota_breach, admin_action, replay_rejected,
12 instruction_audit, instruction_quarantined, instruction_promoted,
13 peer_hlc_anomaly, api_key_rehashed
14"""
16from __future__ import annotations
18import json
19import uuid
20from datetime import UTC, datetime
21from typing import Any
23from ..db import db
24from ..plugins import AuditEvent, Success, get_registry
25from .metrics import AUDIT_EVENT
27INSTRUCTION_QUARANTINED = "instruction_quarantined"
28INSTRUCTION_PROMOTED = "instruction_promoted"
31def is_instruction_fact(
32 entity: str | None,
33 relation: str | None = None,
34 interpret_as: str | None = None,
35) -> bool:
36 """Return true for instruction-namespace or instruction-interpreted facts."""
37 return bool(
38 interpret_as == "instruction"
39 or
40 (entity and entity.startswith("instruction:"))
41 or (relation and relation.startswith("instruction:"))
42 )
45def _emit_with_conn(
46 event_type: str,
47 entity_uri: str,
48 tenant_id: str,
49 oidc_sub: str | None,
50 fact_id: str | None,
51 source: str,
52 attested_key_id: str | None,
53 detail_json: str | None,
54 now: str,
55 entry_id: str,
56 conn: Any,
57) -> None:
58 cur = conn.execute(
59 """INSERT INTO fact_audit_log
60 (id, fact_id, event_type, entity_uri, oidc_sub, source,
61 attested_key_id, ts, tenant_id, detail)
62 VALUES (?,?,?,?,?,?,?,?,?,?)""",
63 (
64 entry_id,
65 fact_id,
66 event_type,
67 entity_uri,
68 oidc_sub,
69 source,
70 attested_key_id,
71 now,
72 tenant_id,
73 detail_json,
74 ),
75 )
76 # seq = rowid on SQLite (implicit monotonic integer); on PostgreSQL the column
77 # carries a sequence DEFAULT so no manual update is needed — lastrowid is None.
78 row_seq = getattr(cur, "lastrowid", None)
79 if row_seq is not None: 79 ↛ exitline 79 didn't return from function '_emit_with_conn' because the condition on line 79 was always true
80 conn.execute("UPDATE fact_audit_log SET seq=? WHERE id=?", (row_seq, entry_id))
83def emit(
84 event_type: str,
85 *,
86 entity_uri: str,
87 tenant_id: str = "default",
88 oidc_sub: str | None = None,
89 fact_id: str | None = None,
90 source: str = "",
91 attested_key_id: str | None = None,
92 scope: str | None = None,
93 detail: dict[str, Any] | None = None,
94 conn: Any = None,
95) -> None:
96 """Append one audit event to fact_audit_log (write-ahead).
98 Pass ``conn`` to reuse an existing open connection and participate in its
99 transaction (required when called from inside a ``with db() as conn:`` block
100 to avoid nested-lock errors on SQLite). If ``conn`` is None a fresh
101 connection context is opened.
103 The ``seq`` column is populated from the inserted row's implicit rowid on
104 SQLite, or from a named sequence DEFAULT on PostgreSQL (migration 022).
105 """
106 now = datetime.now(UTC).isoformat()
107 entry_id = str(uuid.uuid4())
108 detail_json = json.dumps(detail) if detail else None
110 if conn is not None:
111 _emit_with_conn(
112 event_type,
113 entity_uri,
114 tenant_id,
115 oidc_sub,
116 fact_id,
117 source,
118 attested_key_id,
119 detail_json,
120 now,
121 entry_id,
122 conn,
123 )
124 else:
125 with db() as _conn:
126 _emit_with_conn(
127 event_type,
128 entity_uri,
129 tenant_id,
130 oidc_sub,
131 fact_id,
132 source,
133 attested_key_id,
134 detail_json,
135 now,
136 entry_id,
137 _conn,
138 )
140 # §22.3: increment Prometheus counter for every successfully written audit event.
141 AUDIT_EVENT.labels(event_type=event_type, tenant=tenant_id).inc()
142 get_registry().fire_fire_and_forget(
143 "audit_emit",
144 event=AuditEvent(
145 event_type=event_type,
146 actor_uri=entity_uri,
147 target_uri=fact_id,
148 tenant_id=tenant_id,
149 timestamp=datetime.fromisoformat(now),
150 outcome=Success(),
151 metadata={
152 "oidc_sub": oidc_sub,
153 "source": source,
154 "attested_key_id": attested_key_id,
155 "scope": scope,
156 "detail": detail or {},
157 "audit_entry_id": entry_id,
158 },
159 ),
160 )
163def emit_nofail(
164 event_type: str,
165 *,
166 entity_uri: str,
167 tenant_id: str = "default",
168 oidc_sub: str | None = None,
169 fact_id: str | None = None,
170 source: str = "",
171 attested_key_id: str | None = None,
172 scope: str | None = None,
173 detail: dict[str, Any] | None = None,
174) -> None:
175 """Like ``emit()`` but swallows exceptions — use in middleware/best-effort paths."""
176 import logging
178 try:
179 emit(
180 event_type,
181 entity_uri=entity_uri,
182 tenant_id=tenant_id,
183 oidc_sub=oidc_sub,
184 fact_id=fact_id,
185 source=source,
186 attested_key_id=attested_key_id,
187 scope=scope,
188 detail=detail,
189 )
190 except Exception:
191 logging.getLogger("stigmem.audit").exception(
192 "Failed to emit audit event type=%s principal=%s tenant=%s",
193 event_type,
194 entity_uri,
195 tenant_id,
196 )
199def emit_instruction_event_if_applicable(
200 event_type: str,
201 *,
202 fact_id: str,
203 fact_entity: str | None,
204 fact_relation: str | None,
205 fact_interpret_as: str | None = None,
206 actor_uri: str,
207 tenant_id: str = "default",
208 oidc_sub: str | None = None,
209 source: str = "",
210 detail: dict[str, Any] | None = None,
211 conn: Any,
212) -> None:
213 """Emit ADR-003 instruction audit events for instruction facts."""
214 if not is_instruction_fact(fact_entity, fact_relation, fact_interpret_as):
215 return
217 emit(
218 event_type,
219 entity_uri=actor_uri,
220 tenant_id=tenant_id,
221 oidc_sub=oidc_sub,
222 fact_id=fact_id,
223 source=source or actor_uri,
224 detail={
225 "fact_entity": fact_entity,
226 "fact_relation": fact_relation,
227 "fact_interpret_as": fact_interpret_as,
228 **(detail or {}),
229 },
230 conn=conn,
231 )