Coverage for node / src / stigmem_node / lifecycle / tombstones.py: 80%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""RTBF tombstone storage layer and recall-time filter — spec §23. 

2 

3Storage operations: 

4 create_tombstone(...) → TombstoneRecord 

5 revoke_tombstone(...) → TombstoneRevocationRecord 

6 get_tombstone_status(entity_uri) → TombstoneStatusResponse 

7 list_tombstones(scope, since) → list[TombstoneRecord] 

8 list_revocations(since) → list[TombstoneRevocationRecord] 

9 

10Recall-time filter (§23.3): 

11 is_tombstoned(entity_uri, scope) → bool (uses 60-second LRU cache) 

12 filter_tombstoned_records(records) → list[FactRecord] 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18import time 

19import uuid 

20from dataclasses import dataclass, field 

21from datetime import UTC, datetime 

22from typing import Any 

23 

24from ..db import db 

25from ..models.tombstones import ( 

26 TombstoneRecord, 

27 TombstoneRevocationRecord, 

28 TombstoneStatusResponse, 

29) 

30 

31logger = logging.getLogger("stigmem.tombstones") 

32 

33# --------------------------------------------------------------------------- 

34# In-process tombstone LRU cache (§23.3.3 rule 4 — refresh at most every 60s) 

35# --------------------------------------------------------------------------- 

36 

37_TOMBSTONE_CACHE_TTL = 60.0 

38 

39@dataclass 

40class _TombstoneScopeCacheState: 

41 # Full set of active (entity_uri, scope) pairs from DB — refreshed every 60s. 

42 active_set: set[tuple[str, str]] = field(default_factory=set) 

43 refreshed_at: float = 0.0 

44 

45 

46_tombstone_scope_cache = _TombstoneScopeCacheState() 

47 

48 

49def _scope_matches(pattern: str, fact_scope: str) -> bool: 

50 """Return True if tombstone scope pattern covers fact_scope (§23.2.3).""" 

51 return pattern == "*" or pattern == fact_scope 

52 

53 

54def _refresh_tombstone_cache() -> None: 

55 now = time.monotonic() 

56 if now - _tombstone_scope_cache.refreshed_at < _TOMBSTONE_CACHE_TTL: 

57 return 

58 try: 

59 with db() as conn: 

60 # BEGIN IMMEDIATE for consistency (§23.3.3 rule 5, SQLite path) 

61 conn.execute("BEGIN IMMEDIATE") 

62 rows = conn.execute( 

63 """SELECT t.entity_uri, t.scope 

64 FROM tombstones t 

65 WHERE NOT EXISTS ( 

66 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id 

67 )""" 

68 ).fetchall() 

69 conn.execute("COMMIT") 

70 _tombstone_scope_cache.active_set = {(r["entity_uri"], r["scope"]) for r in rows} 

71 _tombstone_scope_cache.refreshed_at = now 

72 except Exception: 

73 logger.exception("Failed to refresh tombstone cache") 

74 

75 

76def is_tombstoned(entity_uri: str, fact_scope: str) -> bool: 

77 """Return True if entity_uri has an active tombstone covering fact_scope.""" 

78 from .tombstone_gate import tombstone_filter_enabled 

79 

80 if not tombstone_filter_enabled(): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 return False 

82 _refresh_tombstone_cache() 

83 for uri, pattern in _tombstone_scope_cache.active_set: 

84 if uri == entity_uri and _scope_matches(pattern, fact_scope): 

85 return True 

86 return False 

87 

88 

89def invalidate_tombstone_cache() -> None: 

90 """Force cache refresh on next call (used after local tombstone write).""" 

91 _tombstone_scope_cache.refreshed_at = 0.0 

92 try: 

93 from .tombstone_cache import invalidate as _cache_invalidate 

94 

95 _cache_invalidate() 

96 except Exception: 

97 logger.exception("Failed to invalidate tombstone cache") 

98 

99 

100# --------------------------------------------------------------------------- 

101# Storage operations 

102# --------------------------------------------------------------------------- 

103 

104 

105def _row_to_tombstone(row: Any) -> TombstoneRecord: 

106 return TombstoneRecord( 

107 id=row["id"], 

108 entity_uri=row["entity_uri"], 

109 scope=row["scope"], 

110 reason=row["reason"], 

111 signed_by=row["signed_by"], 

112 key_id=row["key_id"] or "", 

113 signature=row["signature"], 

114 created_at=row["created_at"], 

115 legal_hold=bool(row["legal_hold"]), 

116 ) 

117 

118 

119def _row_to_revocation(row: Any) -> TombstoneRevocationRecord: 

120 return TombstoneRevocationRecord( 

121 id=row["id"], 

122 tombstone_id=row["tombstone_id"], 

123 reason=row["reason"], 

124 signed_by=row["signed_by"], 

125 key_id=row["key_id"] or "", 

126 signature=row["signature"], 

127 created_at=row["created_at"], 

128 ) 

129 

130 

131def create_tombstone( 

132 entity_uri: str, 

133 scope: str, 

134 reason: str | None, 

135 signed_by: str, 

136 key_id: str, 

137 signature: str, 

138 legal_hold: bool = False, 

139 tenant_id: str = "default", 

140 *, 

141 tombstone_id: str | None = None, 

142 created_at: str | None = None, 

143) -> TombstoneRecord: 

144 """Write a tombstone record. Idempotent on (entity_uri, scope) for active tombstones.""" 

145 now = created_at or datetime.now(UTC).isoformat() 

146 with db() as conn: 

147 existing = conn.execute( 

148 """SELECT t.id FROM tombstones t 

149 WHERE t.entity_uri = ? AND t.scope = ? AND t.tenant_id = ? 

150 AND NOT EXISTS ( 

151 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id 

152 )""", 

153 (entity_uri, scope, tenant_id), 

154 ).fetchone() 

155 if existing: 

156 row = conn.execute( 

157 "SELECT * FROM tombstones WHERE id = ?", (existing["id"],) 

158 ).fetchone() 

159 return _row_to_tombstone(row) 

160 

161 tomb_id = tombstone_id or "tomb_" + str(uuid.uuid4()) 

162 _emit_tombstone_audit( 

163 conn=conn, 

164 event_type="tombstone_created", 

165 actor_uri=signed_by, 

166 tombstone_id=tomb_id, 

167 entity_uri=entity_uri, 

168 scope=scope, 

169 source="local", 

170 detail={"legal_hold": legal_hold}, 

171 ) 

172 conn.execute( 

173 """INSERT INTO tombstones 

174 (id, entity_uri, scope, reason, signed_by, key_id, signature, 

175 created_at, legal_hold, tenant_id) 

176 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", 

177 ( 

178 tomb_id, 

179 entity_uri, 

180 scope, 

181 reason, 

182 signed_by, 

183 key_id or None, 

184 signature, 

185 now, 

186 int(legal_hold), 

187 tenant_id, 

188 ), 

189 ) 

190 row = conn.execute("SELECT * FROM tombstones WHERE id = ?", (tomb_id,)).fetchone() 

191 

192 invalidate_tombstone_cache() 

193 logger.info("Tombstone created: %s for entity %s scope %s", tomb_id, entity_uri, scope) 

194 return _row_to_tombstone(row) 

195 

196 

197def revoke_tombstone( 

198 tombstone_id: str, 

199 reason: str, 

200 signed_by: str, 

201 key_id: str, 

202 signature: str, 

203) -> TombstoneRevocationRecord: 

204 """Write a tombstone revocation record (§23.2.5).""" 

205 now = datetime.now(UTC).isoformat() 

206 with db() as conn: 

207 tomb = conn.execute("SELECT id FROM tombstones WHERE id = ?", (tombstone_id,)).fetchone() 

208 if tomb is None: 

209 raise KeyError("tombstone_not_found") 

210 existing_rev = conn.execute( 

211 "SELECT id FROM tombstone_revocations WHERE tombstone_id = ?", (tombstone_id,) 

212 ).fetchone() 

213 if existing_rev: 

214 raise ValueError("tombstone_already_revoked") 

215 

216 rev_id = "tombrevoke_" + str(uuid.uuid4()) 

217 _emit_tombstone_audit( 

218 conn=conn, 

219 event_type="tombstone_revoked", 

220 actor_uri=signed_by, 

221 tombstone_id=tombstone_id, 

222 entity_uri=tombstone_id, 

223 scope=None, 

224 source="local", 

225 detail={"revocation_id": rev_id}, 

226 ) 

227 conn.execute( 

228 """INSERT INTO tombstone_revocations 

229 (id, tombstone_id, reason, signed_by, key_id, signature, created_at) 

230 VALUES (?, ?, ?, ?, ?, ?, ?)""", 

231 (rev_id, tombstone_id, reason, signed_by, key_id, signature, now), 

232 ) 

233 row = conn.execute("SELECT * FROM tombstone_revocations WHERE id = ?", (rev_id,)).fetchone() 

234 

235 invalidate_tombstone_cache() 

236 logger.info("Tombstone revoked: %s → revocation %s", tombstone_id, rev_id) 

237 return _row_to_revocation(row) 

238 

239 

240def get_tombstone_status(entity_uri: str) -> TombstoneStatusResponse: 

241 """Return tombstone status for entity_uri — admin-only endpoint data.""" 

242 with db() as conn: 

243 t_rows = conn.execute( 

244 "SELECT * FROM tombstones WHERE entity_uri = ? ORDER BY created_at", 

245 (entity_uri,), 

246 ).fetchall() 

247 tombstone_list = [_row_to_tombstone(r) for r in t_rows] 

248 

249 if not tombstone_list: 

250 return TombstoneStatusResponse(tombstoned=False, tombstones=[], revocations=[]) 

251 

252 rev_rows = [] 

253 for tombstone in tombstone_list: 

254 rev_rows.extend( 

255 conn.execute( 

256 """SELECT * FROM tombstone_revocations 

257 WHERE tombstone_id = ? 

258 ORDER BY created_at""", 

259 (tombstone.id,), 

260 ).fetchall() 

261 ) 

262 revocation_list = [_row_to_revocation(r) for r in rev_rows] 

263 

264 revoked_ids = {r.tombstone_id for r in revocation_list} 

265 active = any(t.id not in revoked_ids for t in tombstone_list) 

266 return TombstoneStatusResponse( 

267 tombstoned=active, 

268 tombstones=tombstone_list, 

269 revocations=revocation_list, 

270 ) 

271 

272 

273def list_tombstones(scope: str | None = None, since: str | None = None) -> list[TombstoneRecord]: 

274 """List tombstones for federation poll (§23.4.3).""" 

275 query = "SELECT * FROM tombstones WHERE 1=1" 

276 params: list[Any] = [] 

277 if scope is not None and scope != "*": 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 query += " AND (scope = ? OR scope = '*')" 

279 params.append(scope) 

280 if since is not None: 

281 query += " AND created_at > ?" 

282 params.append(since) 

283 query += " ORDER BY created_at" 

284 with db() as conn: 

285 rows = conn.execute(query, params).fetchall() 

286 return [_row_to_tombstone(r) for r in rows] 

287 

288 

289def list_revocations(since: str | None = None) -> list[TombstoneRevocationRecord]: 

290 """List tombstone revocations for federation poll.""" 

291 query = "SELECT * FROM tombstone_revocations WHERE 1=1" 

292 params: list[Any] = [] 

293 if since is not None: 

294 query += " AND created_at > ?" 

295 params.append(since) 

296 query += " ORDER BY created_at" 

297 with db() as conn: 

298 rows = conn.execute(query, params).fetchall() 

299 return [_row_to_revocation(r) for r in rows] 

300 

301 

302def apply_inbound_tombstone(record: TombstoneRecord) -> bool: 

303 """Apply an inbound tombstone from federation (§23.4.2). Idempotent on id. 

304 

305 Returns True if written, False if already existed. 

306 Caller MUST verify signature before calling this. 

307 """ 

308 with db() as conn: 

309 existing = conn.execute("SELECT id FROM tombstones WHERE id = ?", (record.id,)).fetchone() 

310 if existing: 

311 return False 

312 _emit_tombstone_audit( 

313 conn=conn, 

314 event_type="tombstone_federation_ingested", 

315 actor_uri=record.signed_by, 

316 tombstone_id=record.id, 

317 entity_uri=record.entity_uri, 

318 scope=record.scope, 

319 source="federation", 

320 detail={"legal_hold": record.legal_hold}, 

321 ) 

322 conn.execute( 

323 """INSERT INTO tombstones 

324 (id, entity_uri, scope, reason, signed_by, key_id, signature, 

325 created_at, legal_hold, tenant_id) 

326 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", 

327 ( 

328 record.id, 

329 record.entity_uri, 

330 record.scope, 

331 record.reason, 

332 record.signed_by, 

333 record.key_id or None, 

334 record.signature, 

335 record.created_at, 

336 int(record.legal_hold), 

337 "default", 

338 ), 

339 ) 

340 invalidate_tombstone_cache() 

341 logger.info("Inbound tombstone applied: %s for %s", record.id, record.entity_uri) 

342 return True 

343 

344 

345def apply_inbound_revocation(record: TombstoneRevocationRecord) -> bool: 

346 """Apply an inbound revocation from federation. Idempotent on id.""" 

347 with db() as conn: 

348 tomb = conn.execute( 

349 "SELECT id FROM tombstones WHERE id = ?", (record.tombstone_id,) 

350 ).fetchone() 

351 if tomb is None: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 logger.warning( 

353 "Inbound revocation for unknown tombstone %s; storing anyway", record.tombstone_id 

354 ) 

355 existing = conn.execute( 

356 "SELECT id FROM tombstone_revocations WHERE id = ?", (record.id,) 

357 ).fetchone() 

358 if existing: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 return False 

360 _emit_tombstone_audit( 

361 conn=conn, 

362 event_type="tombstone_revocation_federation_ingested", 

363 actor_uri=record.signed_by, 

364 tombstone_id=record.tombstone_id, 

365 entity_uri=record.tombstone_id, 

366 scope=None, 

367 source="federation", 

368 detail={"revocation_id": record.id}, 

369 ) 

370 conn.execute( 

371 """INSERT INTO tombstone_revocations 

372 (id, tombstone_id, reason, signed_by, key_id, signature, created_at) 

373 VALUES (?, ?, ?, ?, ?, ?, ?)""", 

374 ( 

375 record.id, 

376 record.tombstone_id, 

377 record.reason, 

378 record.signed_by, 

379 record.key_id, 

380 record.signature, 

381 record.created_at, 

382 ), 

383 ) 

384 invalidate_tombstone_cache() 

385 return True 

386 

387 

388def _emit_tombstone_audit( 

389 *, 

390 conn: Any, 

391 event_type: str, 

392 actor_uri: str, 

393 tombstone_id: str, 

394 entity_uri: str, 

395 scope: str | None, 

396 source: str, 

397 detail: dict[str, Any], 

398) -> None: 

399 from ..observability.audit_event import emit 

400 

401 emit( 

402 event_type, 

403 entity_uri=actor_uri, 

404 fact_id=tombstone_id, 

405 source=source, 

406 scope=scope, 

407 detail={ 

408 "target_entity_uri": entity_uri, 

409 "scope": scope, 

410 **detail, 

411 }, 

412 conn=conn, 

413 ) 

414 

415 

416# --------------------------------------------------------------------------- 

417# Recall-time filter (§23.3) 

418# --------------------------------------------------------------------------- 

419 

420 

421def filter_tombstoned_records(records: list[Any]) -> list[Any]: 

422 """Remove facts whose entity or ref-value is tombstoned (§23.3.1, §23.3.2). 

423 

424 Also strips tombstoned entries from derived_from and related_entities per spec. 

425 """ 

426 _refresh_tombstone_cache() 

427 if not _tombstone_scope_cache.active_set: 

428 return records 

429 

430 result = [] 

431 for record in records: 

432 scope = getattr(record, "scope", "local") 

433 

434 # §23.3.1 rule 2 — exclude facts whose entity is tombstoned 

435 entity = getattr(record, "entity", None) 

436 if entity and is_tombstoned(entity, scope): 

437 continue 

438 

439 # §23.3.1 rule 2 — exclude ref-valued facts pointing to tombstoned entities 

440 value = getattr(record, "value", None) 

441 if value and getattr(value, "type", None) == "ref": 

442 ref_uri = str(value.v) if value.v is not None else "" 

443 if ref_uri and is_tombstoned(ref_uri, scope): 

444 continue 

445 

446 result.append(record) 

447 

448 return result