Coverage for node / src / stigmem_node / lifecycle / tombstone_cache.py: 95%

33 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Tombstone in-process cache — spec §23.3.3 r.4. 

2 

3Active tombstone entity URIs are cached in-process and refreshed at most every 

460 seconds to avoid per-event DB reads during subscription delivery and recall. 

5Worst-case 60-second leak window after tombstone creation is acceptable per spec. 

6""" 

7 

8from __future__ import annotations 

9 

10import threading 

11import time 

12from dataclasses import dataclass, field 

13 

14# §23.3.3 r.4: refresh interval ceiling 

15_CACHE_TTL_SECONDS: float = 60.0 

16 

17_lock = threading.Lock() 

18 

19 

20@dataclass 

21class _TombstoneCacheState: 

22 tombstoned: frozenset[tuple[str, str]] = field(default_factory=frozenset) 

23 last_refresh: float = 0.0 

24 

25 

26_state = _TombstoneCacheState() 

27 

28 

29def _load_from_db() -> frozenset[tuple[str, str]]: 

30 from ..db import db 

31 

32 with db() as conn: 

33 rows = conn.execute( 

34 """ 

35 SELECT DISTINCT t.entity_uri, t.tenant_id 

36 FROM tombstones t 

37 WHERE NOT EXISTS ( 

38 SELECT 1 FROM tombstone_revocations r 

39 WHERE r.tombstone_id = t.id 

40 ) 

41 """ 

42 ).fetchall() 

43 return frozenset((row["entity_uri"], row["tenant_id"]) for row in rows) 

44 

45 

46def _ensure_fresh() -> None: 

47 if time.monotonic() - _state.last_refresh < _CACHE_TTL_SECONDS: 

48 return 

49 with _lock: 

50 # Double-check under lock — another thread may have refreshed already. 

51 if time.monotonic() - _state.last_refresh < _CACHE_TTL_SECONDS: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 return 

53 _state.tombstoned = _load_from_db() 

54 _state.last_refresh = time.monotonic() 

55 

56 

57def invalidate() -> None: 

58 """Force cache expiry so the next call to is_tombstoned triggers a DB read. 

59 

60 Call after writing a new tombstone or revocation row. 

61 """ 

62 with _lock: 

63 _state.last_refresh = 0.0 

64 

65 

66def is_tombstoned(entity_uri: str, tenant_id: str = "default") -> bool: 

67 """Return True if *entity_uri* has an active tombstone in *tenant_id*. 

68 

69 Uses a thread-safe in-process cache refreshed at most every 60 s (§23.3.3 r.4). 

70 """ 

71 from .tombstone_gate import tombstone_filter_enabled 

72 

73 if not tombstone_filter_enabled(): 

74 return False 

75 _ensure_fresh() 

76 return (entity_uri, tenant_id) in _state.tombstoned