Coverage for node / src / stigmem_node / lifecycle / tombstones.py: 80%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""RTBF tombstone storage layer and recall-time filter — spec §23.
3Storage operations:
4 create_tombstone(...) → TombstoneRecord
5 revoke_tombstone(...) → TombstoneRevocationRecord
6 get_tombstone_status(entity_uri) → TombstoneStatusResponse
7 list_tombstones(scope, since) → list[TombstoneRecord]
8 list_revocations(since) → list[TombstoneRevocationRecord]
10Recall-time filter (§23.3):
11 is_tombstoned(entity_uri, scope) → bool (uses 60-second LRU cache)
12 filter_tombstoned_records(records) → list[FactRecord]
13"""
15from __future__ import annotations
17import logging
18import time
19import uuid
20from dataclasses import dataclass, field
21from datetime import UTC, datetime
22from typing import Any
24from ..db import db
25from ..models.tombstones import (
26 TombstoneRecord,
27 TombstoneRevocationRecord,
28 TombstoneStatusResponse,
29)
31logger = logging.getLogger("stigmem.tombstones")
33# ---------------------------------------------------------------------------
34# In-process tombstone LRU cache (§23.3.3 rule 4 — refresh at most every 60s)
35# ---------------------------------------------------------------------------
37_TOMBSTONE_CACHE_TTL = 60.0
39@dataclass
40class _TombstoneScopeCacheState:
41 # Full set of active (entity_uri, scope) pairs from DB — refreshed every 60s.
42 active_set: set[tuple[str, str]] = field(default_factory=set)
43 refreshed_at: float = 0.0
46_tombstone_scope_cache = _TombstoneScopeCacheState()
49def _scope_matches(pattern: str, fact_scope: str) -> bool:
50 """Return True if tombstone scope pattern covers fact_scope (§23.2.3)."""
51 return pattern == "*" or pattern == fact_scope
54def _refresh_tombstone_cache() -> None:
55 now = time.monotonic()
56 if now - _tombstone_scope_cache.refreshed_at < _TOMBSTONE_CACHE_TTL:
57 return
58 try:
59 with db() as conn:
60 # BEGIN IMMEDIATE for consistency (§23.3.3 rule 5, SQLite path)
61 conn.execute("BEGIN IMMEDIATE")
62 rows = conn.execute(
63 """SELECT t.entity_uri, t.scope
64 FROM tombstones t
65 WHERE NOT EXISTS (
66 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id
67 )"""
68 ).fetchall()
69 conn.execute("COMMIT")
70 _tombstone_scope_cache.active_set = {(r["entity_uri"], r["scope"]) for r in rows}
71 _tombstone_scope_cache.refreshed_at = now
72 except Exception:
73 logger.exception("Failed to refresh tombstone cache")
76def is_tombstoned(entity_uri: str, fact_scope: str) -> bool:
77 """Return True if entity_uri has an active tombstone covering fact_scope."""
78 from .tombstone_gate import tombstone_filter_enabled
80 if not tombstone_filter_enabled(): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 return False
82 _refresh_tombstone_cache()
83 for uri, pattern in _tombstone_scope_cache.active_set:
84 if uri == entity_uri and _scope_matches(pattern, fact_scope):
85 return True
86 return False
89def invalidate_tombstone_cache() -> None:
90 """Force cache refresh on next call (used after local tombstone write)."""
91 _tombstone_scope_cache.refreshed_at = 0.0
92 try:
93 from .tombstone_cache import invalidate as _cache_invalidate
95 _cache_invalidate()
96 except Exception:
97 logger.exception("Failed to invalidate tombstone cache")
100# ---------------------------------------------------------------------------
101# Storage operations
102# ---------------------------------------------------------------------------
105def _row_to_tombstone(row: Any) -> TombstoneRecord:
106 return TombstoneRecord(
107 id=row["id"],
108 entity_uri=row["entity_uri"],
109 scope=row["scope"],
110 reason=row["reason"],
111 signed_by=row["signed_by"],
112 key_id=row["key_id"] or "",
113 signature=row["signature"],
114 created_at=row["created_at"],
115 legal_hold=bool(row["legal_hold"]),
116 )
119def _row_to_revocation(row: Any) -> TombstoneRevocationRecord:
120 return TombstoneRevocationRecord(
121 id=row["id"],
122 tombstone_id=row["tombstone_id"],
123 reason=row["reason"],
124 signed_by=row["signed_by"],
125 key_id=row["key_id"] or "",
126 signature=row["signature"],
127 created_at=row["created_at"],
128 )
131def create_tombstone(
132 entity_uri: str,
133 scope: str,
134 reason: str | None,
135 signed_by: str,
136 key_id: str,
137 signature: str,
138 legal_hold: bool = False,
139 tenant_id: str = "default",
140 *,
141 tombstone_id: str | None = None,
142 created_at: str | None = None,
143) -> TombstoneRecord:
144 """Write a tombstone record. Idempotent on (entity_uri, scope) for active tombstones."""
145 now = created_at or datetime.now(UTC).isoformat()
146 with db() as conn:
147 existing = conn.execute(
148 """SELECT t.id FROM tombstones t
149 WHERE t.entity_uri = ? AND t.scope = ? AND t.tenant_id = ?
150 AND NOT EXISTS (
151 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id
152 )""",
153 (entity_uri, scope, tenant_id),
154 ).fetchone()
155 if existing:
156 row = conn.execute(
157 "SELECT * FROM tombstones WHERE id = ?", (existing["id"],)
158 ).fetchone()
159 return _row_to_tombstone(row)
161 tomb_id = tombstone_id or "tomb_" + str(uuid.uuid4())
162 _emit_tombstone_audit(
163 conn=conn,
164 event_type="tombstone_created",
165 actor_uri=signed_by,
166 tombstone_id=tomb_id,
167 entity_uri=entity_uri,
168 scope=scope,
169 source="local",
170 detail={"legal_hold": legal_hold},
171 )
172 conn.execute(
173 """INSERT INTO tombstones
174 (id, entity_uri, scope, reason, signed_by, key_id, signature,
175 created_at, legal_hold, tenant_id)
176 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
177 (
178 tomb_id,
179 entity_uri,
180 scope,
181 reason,
182 signed_by,
183 key_id or None,
184 signature,
185 now,
186 int(legal_hold),
187 tenant_id,
188 ),
189 )
190 row = conn.execute("SELECT * FROM tombstones WHERE id = ?", (tomb_id,)).fetchone()
192 invalidate_tombstone_cache()
193 logger.info("Tombstone created: %s for entity %s scope %s", tomb_id, entity_uri, scope)
194 return _row_to_tombstone(row)
197def revoke_tombstone(
198 tombstone_id: str,
199 reason: str,
200 signed_by: str,
201 key_id: str,
202 signature: str,
203) -> TombstoneRevocationRecord:
204 """Write a tombstone revocation record (§23.2.5)."""
205 now = datetime.now(UTC).isoformat()
206 with db() as conn:
207 tomb = conn.execute("SELECT id FROM tombstones WHERE id = ?", (tombstone_id,)).fetchone()
208 if tomb is None:
209 raise KeyError("tombstone_not_found")
210 existing_rev = conn.execute(
211 "SELECT id FROM tombstone_revocations WHERE tombstone_id = ?", (tombstone_id,)
212 ).fetchone()
213 if existing_rev:
214 raise ValueError("tombstone_already_revoked")
216 rev_id = "tombrevoke_" + str(uuid.uuid4())
217 _emit_tombstone_audit(
218 conn=conn,
219 event_type="tombstone_revoked",
220 actor_uri=signed_by,
221 tombstone_id=tombstone_id,
222 entity_uri=tombstone_id,
223 scope=None,
224 source="local",
225 detail={"revocation_id": rev_id},
226 )
227 conn.execute(
228 """INSERT INTO tombstone_revocations
229 (id, tombstone_id, reason, signed_by, key_id, signature, created_at)
230 VALUES (?, ?, ?, ?, ?, ?, ?)""",
231 (rev_id, tombstone_id, reason, signed_by, key_id, signature, now),
232 )
233 row = conn.execute("SELECT * FROM tombstone_revocations WHERE id = ?", (rev_id,)).fetchone()
235 invalidate_tombstone_cache()
236 logger.info("Tombstone revoked: %s → revocation %s", tombstone_id, rev_id)
237 return _row_to_revocation(row)
240def get_tombstone_status(entity_uri: str) -> TombstoneStatusResponse:
241 """Return tombstone status for entity_uri — admin-only endpoint data."""
242 with db() as conn:
243 t_rows = conn.execute(
244 "SELECT * FROM tombstones WHERE entity_uri = ? ORDER BY created_at",
245 (entity_uri,),
246 ).fetchall()
247 tombstone_list = [_row_to_tombstone(r) for r in t_rows]
249 if not tombstone_list:
250 return TombstoneStatusResponse(tombstoned=False, tombstones=[], revocations=[])
252 rev_rows = []
253 for tombstone in tombstone_list:
254 rev_rows.extend(
255 conn.execute(
256 """SELECT * FROM tombstone_revocations
257 WHERE tombstone_id = ?
258 ORDER BY created_at""",
259 (tombstone.id,),
260 ).fetchall()
261 )
262 revocation_list = [_row_to_revocation(r) for r in rev_rows]
264 revoked_ids = {r.tombstone_id for r in revocation_list}
265 active = any(t.id not in revoked_ids for t in tombstone_list)
266 return TombstoneStatusResponse(
267 tombstoned=active,
268 tombstones=tombstone_list,
269 revocations=revocation_list,
270 )
273def list_tombstones(scope: str | None = None, since: str | None = None) -> list[TombstoneRecord]:
274 """List tombstones for federation poll (§23.4.3)."""
275 query = "SELECT * FROM tombstones WHERE 1=1"
276 params: list[Any] = []
277 if scope is not None and scope != "*": 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 query += " AND (scope = ? OR scope = '*')"
279 params.append(scope)
280 if since is not None:
281 query += " AND created_at > ?"
282 params.append(since)
283 query += " ORDER BY created_at"
284 with db() as conn:
285 rows = conn.execute(query, params).fetchall()
286 return [_row_to_tombstone(r) for r in rows]
289def list_revocations(since: str | None = None) -> list[TombstoneRevocationRecord]:
290 """List tombstone revocations for federation poll."""
291 query = "SELECT * FROM tombstone_revocations WHERE 1=1"
292 params: list[Any] = []
293 if since is not None:
294 query += " AND created_at > ?"
295 params.append(since)
296 query += " ORDER BY created_at"
297 with db() as conn:
298 rows = conn.execute(query, params).fetchall()
299 return [_row_to_revocation(r) for r in rows]
302def apply_inbound_tombstone(record: TombstoneRecord) -> bool:
303 """Apply an inbound tombstone from federation (§23.4.2). Idempotent on id.
305 Returns True if written, False if already existed.
306 Caller MUST verify signature before calling this.
307 """
308 with db() as conn:
309 existing = conn.execute("SELECT id FROM tombstones WHERE id = ?", (record.id,)).fetchone()
310 if existing:
311 return False
312 _emit_tombstone_audit(
313 conn=conn,
314 event_type="tombstone_federation_ingested",
315 actor_uri=record.signed_by,
316 tombstone_id=record.id,
317 entity_uri=record.entity_uri,
318 scope=record.scope,
319 source="federation",
320 detail={"legal_hold": record.legal_hold},
321 )
322 conn.execute(
323 """INSERT INTO tombstones
324 (id, entity_uri, scope, reason, signed_by, key_id, signature,
325 created_at, legal_hold, tenant_id)
326 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
327 (
328 record.id,
329 record.entity_uri,
330 record.scope,
331 record.reason,
332 record.signed_by,
333 record.key_id or None,
334 record.signature,
335 record.created_at,
336 int(record.legal_hold),
337 "default",
338 ),
339 )
340 invalidate_tombstone_cache()
341 logger.info("Inbound tombstone applied: %s for %s", record.id, record.entity_uri)
342 return True
345def apply_inbound_revocation(record: TombstoneRevocationRecord) -> bool:
346 """Apply an inbound revocation from federation. Idempotent on id."""
347 with db() as conn:
348 tomb = conn.execute(
349 "SELECT id FROM tombstones WHERE id = ?", (record.tombstone_id,)
350 ).fetchone()
351 if tomb is None: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 logger.warning(
353 "Inbound revocation for unknown tombstone %s; storing anyway", record.tombstone_id
354 )
355 existing = conn.execute(
356 "SELECT id FROM tombstone_revocations WHERE id = ?", (record.id,)
357 ).fetchone()
358 if existing: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 return False
360 _emit_tombstone_audit(
361 conn=conn,
362 event_type="tombstone_revocation_federation_ingested",
363 actor_uri=record.signed_by,
364 tombstone_id=record.tombstone_id,
365 entity_uri=record.tombstone_id,
366 scope=None,
367 source="federation",
368 detail={"revocation_id": record.id},
369 )
370 conn.execute(
371 """INSERT INTO tombstone_revocations
372 (id, tombstone_id, reason, signed_by, key_id, signature, created_at)
373 VALUES (?, ?, ?, ?, ?, ?, ?)""",
374 (
375 record.id,
376 record.tombstone_id,
377 record.reason,
378 record.signed_by,
379 record.key_id,
380 record.signature,
381 record.created_at,
382 ),
383 )
384 invalidate_tombstone_cache()
385 return True
388def _emit_tombstone_audit(
389 *,
390 conn: Any,
391 event_type: str,
392 actor_uri: str,
393 tombstone_id: str,
394 entity_uri: str,
395 scope: str | None,
396 source: str,
397 detail: dict[str, Any],
398) -> None:
399 from ..observability.audit_event import emit
401 emit(
402 event_type,
403 entity_uri=actor_uri,
404 fact_id=tombstone_id,
405 source=source,
406 scope=scope,
407 detail={
408 "target_entity_uri": entity_uri,
409 "scope": scope,
410 **detail,
411 },
412 conn=conn,
413 )
416# ---------------------------------------------------------------------------
417# Recall-time filter (§23.3)
418# ---------------------------------------------------------------------------
421def filter_tombstoned_records(records: list[Any]) -> list[Any]:
422 """Remove facts whose entity or ref-value is tombstoned (§23.3.1, §23.3.2).
424 Also strips tombstoned entries from derived_from and related_entities per spec.
425 """
426 _refresh_tombstone_cache()
427 if not _tombstone_scope_cache.active_set:
428 return records
430 result = []
431 for record in records:
432 scope = getattr(record, "scope", "local")
434 # §23.3.1 rule 2 — exclude facts whose entity is tombstoned
435 entity = getattr(record, "entity", None)
436 if entity and is_tombstoned(entity, scope):
437 continue
439 # §23.3.1 rule 2 — exclude ref-valued facts pointing to tombstoned entities
440 value = getattr(record, "value", None)
441 if value and getattr(value, "type", None) == "ref":
442 ref_uri = str(value.v) if value.v is not None else ""
443 if ref_uri and is_tombstoned(ref_uri, scope):
444 continue
446 result.append(record)
448 return result