Coverage for node / src / stigmem_node / lifecycle / tombstone_cache.py: 95%
33 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Tombstone in-process cache — spec §23.3.3 r.4.
3Active tombstone entity URIs are cached in-process and refreshed at most every
460 seconds to avoid per-event DB reads during subscription delivery and recall.
5Worst-case 60-second leak window after tombstone creation is acceptable per spec.
6"""
8from __future__ import annotations
10import threading
11import time
12from dataclasses import dataclass, field
14# §23.3.3 r.4: refresh interval ceiling
15_CACHE_TTL_SECONDS: float = 60.0
17_lock = threading.Lock()
20@dataclass
21class _TombstoneCacheState:
22 tombstoned: frozenset[tuple[str, str]] = field(default_factory=frozenset)
23 last_refresh: float = 0.0
26_state = _TombstoneCacheState()
29def _load_from_db() -> frozenset[tuple[str, str]]:
30 from ..db import db
32 with db() as conn:
33 rows = conn.execute(
34 """
35 SELECT DISTINCT t.entity_uri, t.tenant_id
36 FROM tombstones t
37 WHERE NOT EXISTS (
38 SELECT 1 FROM tombstone_revocations r
39 WHERE r.tombstone_id = t.id
40 )
41 """
42 ).fetchall()
43 return frozenset((row["entity_uri"], row["tenant_id"]) for row in rows)
46def _ensure_fresh() -> None:
47 if time.monotonic() - _state.last_refresh < _CACHE_TTL_SECONDS:
48 return
49 with _lock:
50 # Double-check under lock — another thread may have refreshed already.
51 if time.monotonic() - _state.last_refresh < _CACHE_TTL_SECONDS: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 return
53 _state.tombstoned = _load_from_db()
54 _state.last_refresh = time.monotonic()
57def invalidate() -> None:
58 """Force cache expiry so the next call to is_tombstoned triggers a DB read.
60 Call after writing a new tombstone or revocation row.
61 """
62 with _lock:
63 _state.last_refresh = 0.0
66def is_tombstoned(entity_uri: str, tenant_id: str = "default") -> bool:
67 """Return True if *entity_uri* has an active tombstone in *tenant_id*.
69 Uses a thread-safe in-process cache refreshed at most every 60 s (§23.3.3 r.4).
70 """
71 from .tombstone_gate import tombstone_filter_enabled
73 if not tombstone_filter_enabled():
74 return False
75 _ensure_fresh()
76 return (entity_uri, tenant_id) in _state.tombstoned