Coverage for node / src / stigmem_node / observability / audit_event.py: 93%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Centralized audit event emission — spec §22.3. 

2 

3All code paths that emit audit events call ``emit()`` here. Write-ahead 

4semantics are guaranteed by the caller: callers must call ``emit()`` *before* 

5returning the HTTP response (or inside the same DB transaction where 

6possible). 

7 

8Supported event_type values (spec §22.3.1): 

9 fact_write, fact_read, capability_token_issue, capability_token_revoke, 

10 manifest_publish, key_rotation, federation_connect, quarantine_admit, 

11 quarantine_release, quota_breach, admin_action, replay_rejected, 

12 instruction_audit, instruction_quarantined, instruction_promoted, 

13 peer_hlc_anomaly, api_key_rehashed 

14""" 

15 

16from __future__ import annotations 

17 

18import json 

19import uuid 

20from datetime import UTC, datetime 

21from typing import Any 

22 

23from ..db import db 

24from ..plugins import AuditEvent, Success, get_registry 

25from .metrics import AUDIT_EVENT 

26 

27INSTRUCTION_QUARANTINED = "instruction_quarantined" 

28INSTRUCTION_PROMOTED = "instruction_promoted" 

29 

30 

31def is_instruction_fact( 

32 entity: str | None, 

33 relation: str | None = None, 

34 interpret_as: str | None = None, 

35) -> bool: 

36 """Return true for instruction-namespace or instruction-interpreted facts.""" 

37 return bool( 

38 interpret_as == "instruction" 

39 or 

40 (entity and entity.startswith("instruction:")) 

41 or (relation and relation.startswith("instruction:")) 

42 ) 

43 

44 

45def _emit_with_conn( 

46 event_type: str, 

47 entity_uri: str, 

48 tenant_id: str, 

49 oidc_sub: str | None, 

50 fact_id: str | None, 

51 source: str, 

52 attested_key_id: str | None, 

53 detail_json: str | None, 

54 now: str, 

55 entry_id: str, 

56 conn: Any, 

57) -> None: 

58 cur = conn.execute( 

59 """INSERT INTO fact_audit_log 

60 (id, fact_id, event_type, entity_uri, oidc_sub, source, 

61 attested_key_id, ts, tenant_id, detail) 

62 VALUES (?,?,?,?,?,?,?,?,?,?)""", 

63 ( 

64 entry_id, 

65 fact_id, 

66 event_type, 

67 entity_uri, 

68 oidc_sub, 

69 source, 

70 attested_key_id, 

71 now, 

72 tenant_id, 

73 detail_json, 

74 ), 

75 ) 

76 # seq = rowid on SQLite (implicit monotonic integer); on PostgreSQL the column 

77 # carries a sequence DEFAULT so no manual update is needed — lastrowid is None. 

78 row_seq = getattr(cur, "lastrowid", None) 

79 if row_seq is not None: 79 ↛ exitline 79 didn't return from function '_emit_with_conn' because the condition on line 79 was always true

80 conn.execute("UPDATE fact_audit_log SET seq=? WHERE id=?", (row_seq, entry_id)) 

81 

82 

83def emit( 

84 event_type: str, 

85 *, 

86 entity_uri: str, 

87 tenant_id: str = "default", 

88 oidc_sub: str | None = None, 

89 fact_id: str | None = None, 

90 source: str = "", 

91 attested_key_id: str | None = None, 

92 scope: str | None = None, 

93 detail: dict[str, Any] | None = None, 

94 conn: Any = None, 

95) -> None: 

96 """Append one audit event to fact_audit_log (write-ahead). 

97 

98 Pass ``conn`` to reuse an existing open connection and participate in its 

99 transaction (required when called from inside a ``with db() as conn:`` block 

100 to avoid nested-lock errors on SQLite). If ``conn`` is None a fresh 

101 connection context is opened. 

102 

103 The ``seq`` column is populated from the inserted row's implicit rowid on 

104 SQLite, or from a named sequence DEFAULT on PostgreSQL (migration 022). 

105 """ 

106 now = datetime.now(UTC).isoformat() 

107 entry_id = str(uuid.uuid4()) 

108 detail_json = json.dumps(detail) if detail else None 

109 

110 if conn is not None: 

111 _emit_with_conn( 

112 event_type, 

113 entity_uri, 

114 tenant_id, 

115 oidc_sub, 

116 fact_id, 

117 source, 

118 attested_key_id, 

119 detail_json, 

120 now, 

121 entry_id, 

122 conn, 

123 ) 

124 else: 

125 with db() as _conn: 

126 _emit_with_conn( 

127 event_type, 

128 entity_uri, 

129 tenant_id, 

130 oidc_sub, 

131 fact_id, 

132 source, 

133 attested_key_id, 

134 detail_json, 

135 now, 

136 entry_id, 

137 _conn, 

138 ) 

139 

140 # §22.3: increment Prometheus counter for every successfully written audit event. 

141 AUDIT_EVENT.labels(event_type=event_type, tenant=tenant_id).inc() 

142 get_registry().fire_fire_and_forget( 

143 "audit_emit", 

144 event=AuditEvent( 

145 event_type=event_type, 

146 actor_uri=entity_uri, 

147 target_uri=fact_id, 

148 tenant_id=tenant_id, 

149 timestamp=datetime.fromisoformat(now), 

150 outcome=Success(), 

151 metadata={ 

152 "oidc_sub": oidc_sub, 

153 "source": source, 

154 "attested_key_id": attested_key_id, 

155 "scope": scope, 

156 "detail": detail or {}, 

157 "audit_entry_id": entry_id, 

158 }, 

159 ), 

160 ) 

161 

162 

163def emit_nofail( 

164 event_type: str, 

165 *, 

166 entity_uri: str, 

167 tenant_id: str = "default", 

168 oidc_sub: str | None = None, 

169 fact_id: str | None = None, 

170 source: str = "", 

171 attested_key_id: str | None = None, 

172 scope: str | None = None, 

173 detail: dict[str, Any] | None = None, 

174) -> None: 

175 """Like ``emit()`` but swallows exceptions — use in middleware/best-effort paths.""" 

176 import logging 

177 

178 try: 

179 emit( 

180 event_type, 

181 entity_uri=entity_uri, 

182 tenant_id=tenant_id, 

183 oidc_sub=oidc_sub, 

184 fact_id=fact_id, 

185 source=source, 

186 attested_key_id=attested_key_id, 

187 scope=scope, 

188 detail=detail, 

189 ) 

190 except Exception: 

191 logging.getLogger("stigmem.audit").exception( 

192 "Failed to emit audit event type=%s principal=%s tenant=%s", 

193 event_type, 

194 entity_uri, 

195 tenant_id, 

196 ) 

197 

198 

199def emit_instruction_event_if_applicable( 

200 event_type: str, 

201 *, 

202 fact_id: str, 

203 fact_entity: str | None, 

204 fact_relation: str | None, 

205 fact_interpret_as: str | None = None, 

206 actor_uri: str, 

207 tenant_id: str = "default", 

208 oidc_sub: str | None = None, 

209 source: str = "", 

210 detail: dict[str, Any] | None = None, 

211 conn: Any, 

212) -> None: 

213 """Emit ADR-003 instruction audit events for instruction facts.""" 

214 if not is_instruction_fact(fact_entity, fact_relation, fact_interpret_as): 

215 return 

216 

217 emit( 

218 event_type, 

219 entity_uri=actor_uri, 

220 tenant_id=tenant_id, 

221 oidc_sub=oidc_sub, 

222 fact_id=fact_id, 

223 source=source or actor_uri, 

224 detail={ 

225 "fact_entity": fact_entity, 

226 "fact_relation": fact_relation, 

227 "fact_interpret_as": fact_interpret_as, 

228 **(detail or {}), 

229 }, 

230 conn=conn, 

231 )