Coverage for node / src / stigmem_node / subscription_delivery.py: 88%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Subscription delivery engine — spec §20.4. 

2 

3Responsibilities: 

41. ``fan_out`` — when a fact is written, find matching subscriptions and insert 

5 pending ``subscription_events`` rows (fast; called synchronously on every write). 

62. ``deliver_pending`` — sweep: attempt delivery for all pending/due-for-retry events. 

73. ``sweep_loop`` — asyncio task run in the app lifespan; calls deliver_pending periodically. 

8 

9Delivery types: 

10- ``webhook`` — HTTP POST to delivery_address with exponential backoff (1 s, 2 s, 4 s … ≤ 300 s). 

11- ``wake`` — structured JSON emitted to stderr; platform-specific integration hook. 

12 

13Garden ACL (§17) and content sanitizer (§19) are applied at delivery time. 

14""" 

15 

16from __future__ import annotations 

17 

18import asyncio 

19import json 

20import logging 

21import sys 

22import threading 

23import time 

24import uuid 

25from datetime import UTC, datetime 

26from typing import Any 

27 

28import httpx 

29 

30from . import settings as _settings_pkg 

31from .auth import Identity 

32from .db import db 

33from .garden_acl import get_member_role 

34from .lifecycle.tombstone_cache import is_tombstoned as _is_tombstoned 

35from .memory_garden_acl_gate import recall_filter_enabled 

36from .recall.recall_pipeline import apply_recall_pipeline 

37 

38logger = logging.getLogger("stigmem.subscriptions") 

39_DELIVER_PENDING_LOCK = threading.Lock() 

40 

41 

42# --------------------------------------------------------------------------- 

43# Fan-out — called from routes/facts.py after each successful fact write 

44# --------------------------------------------------------------------------- 

45 

46 

47def fan_out( 

48 fact_id: str, 

49 entity: str, 

50 scope: str, 

51 garden_id: str | None, 

52 tenant_id: str, 

53 fact_payload_json: str, 

54) -> None: 

55 """Find subscriptions matching this fact and create pending delivery events.""" 

56 now = datetime.now(UTC).isoformat() 

57 

58 with db() as conn: 

59 scope_subs = conn.execute( 

60 """SELECT * FROM subscriptions 

61 WHERE target_kind='scope' AND target=? AND tenant_id=? AND circuit_open=0""", 

62 (scope, tenant_id), 

63 ).fetchall() 

64 

65 entity_subs = conn.execute( 

66 """SELECT * FROM subscriptions 

67 WHERE target_kind='entity' AND target=? AND tenant_id=? AND circuit_open=0""", 

68 (entity, tenant_id), 

69 ).fetchall() 

70 

71 for sub in list(scope_subs) + list(entity_subs): 

72 event_id = str(uuid.uuid4()) 

73 conn.execute( 

74 """INSERT INTO subscription_events 

75 (id, subscription_id, event_type, entity_uri, fact_id, 

76 payload, created_at, delivery_status) 

77 VALUES (?,?,?,?,?,?,?,'pending')""", 

78 (event_id, sub["id"], "fact_asserted", entity, fact_id, fact_payload_json, now), 

79 ) 

80 

81 

82# --------------------------------------------------------------------------- 

83# Delivery sweep — called by sweep_loop every N seconds 

84# --------------------------------------------------------------------------- 

85 

86 

87def deliver_pending() -> None: 

88 """Attempt delivery for all pending events that are due. 

89 

90 Uses an atomic claim (``pending → delivering``) so that the background 

91 ``sweep_loop`` and any concurrent caller (admin replay endpoints, 

92 tests, future per-tenant workers) cannot deliver the same event twice. 

93 See issue #47 and migration 028 for the rationale. 

94 

95 Concurrency model 

96 ----------------- 

97 1. Recover stale claims: any row in ``delivering`` with 

98 ``claimed_at`` older than ``subscription_claim_timeout_s`` is reset 

99 to ``pending`` so a crashed worker cannot strand events. 

100 2. Atomically claim up to 100 due rows via 

101 ``UPDATE … WHERE id IN (SELECT … LIMIT 100) RETURNING …`` — SQLite 

102 serializes writes, so concurrent claimers see disjoint row sets. 

103 3. For each claimed row, attempt delivery and transition to 

104 ``delivered`` (success) or back to ``pending`` with a fresh 

105 ``next_retry_at`` (failure). 

106 """ 

107 if not _DELIVER_PENDING_LOCK.acquire(blocking=False): 

108 logger.debug("subscription delivery already in progress; skipping concurrent drain") 

109 return 

110 

111 try: 

112 now = datetime.now(UTC).isoformat() 

113 claim_timeout_s = _settings_pkg.settings.subscription_claim_timeout_s 

114 stale_cutoff = datetime.fromtimestamp( 

115 time.time() - claim_timeout_s, 

116 UTC, 

117 ).isoformat() 

118 

119 with db() as conn: 

120 # 1. Recover stale claims left behind by a crashed worker. We do NOT 

121 # reset delivery_attempts — the next attempt counts as a retry. 

122 conn.execute( 

123 """UPDATE subscription_events 

124 SET delivery_status='pending', claimed_at=NULL 

125 WHERE delivery_status='delivering' 

126 AND claimed_at IS NOT NULL 

127 AND claimed_at < ?""", 

128 (stale_cutoff,), 

129 ) 

130 

131 # 2. Atomic claim. The inner SELECT is the same predicate the old 

132 # non-atomic path used, joined to subscriptions for circuit-open 

133 # filtering. The outer UPDATE … RETURNING gives us the claimed 

134 # rows in a single round-trip; no other caller can claim them 

135 # until we release them. 

136 claimed = conn.execute( 

137 """UPDATE subscription_events 

138 SET delivery_status='delivering', claimed_at=? 

139 WHERE id IN ( 

140 SELECT e.id 

141 FROM subscription_events e 

142 JOIN subscriptions s ON e.subscription_id = s.id 

143 WHERE e.delivery_status = 'pending' 

144 AND s.circuit_open = 0 

145 AND (e.next_retry_at IS NULL OR e.next_retry_at <= ?) 

146 ORDER BY e.created_at ASC 

147 LIMIT 100 

148 ) 

149 RETURNING id""", 

150 (now, now), 

151 ).fetchall() 

152 

153 if not claimed: 

154 return 

155 

156 claimed_ids = [row["id"] for row in claimed] 

157 # ``placeholders`` is a fixed "?,?,?…" string whose length comes from 

158 # ``claimed_ids`` (UUIDs we just emitted into our own table) — no user 

159 # input flows into the SQL text, so the f-string interpolation is safe. 

160 placeholders = ",".join("?" * len(claimed_ids)) 

161 _base = ( 

162 "SELECT e.id, e.subscription_id, e.event_type, e.entity_uri, e.fact_id," 

163 " e.payload, e.created_at, e.delivery_attempts," 

164 " s.on_change, s.delivery_address, s.subscriber_identity, s.tenant_id," 

165 " s.circuit_open" 

166 " FROM subscription_events e" 

167 " JOIN subscriptions s ON e.subscription_id = s.id" 

168 " WHERE e.id IN (" 

169 ) 

170 select_sql = _base + placeholders + ")" # noqa: S608 — fixed "?,…" string 

171 events = conn.execute(select_sql, claimed_ids).fetchall() 

172 

173 for event in events: 

174 try: 

175 payload = json.loads(event["payload"]) 

176 success = _deliver_one(event, payload) 

177 except Exception as exc: 

178 logger.error("Delivery error for event %s: %s", event["id"], exc) 

179 success = False 

180 

181 _record_result(event, success) 

182 finally: 

183 _DELIVER_PENDING_LOCK.release() 

184 

185 

186# --------------------------------------------------------------------------- 

187# Delivery helpers 

188# --------------------------------------------------------------------------- 

189 

190 

191def _deliver_one(event: Any, payload: dict[str, Any]) -> bool: 

192 on_change = event["on_change"] 

193 if on_change == "webhook": 

194 return _deliver_webhook(event, payload) 

195 if on_change == "wake": 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true

196 return _deliver_wake(event, payload) 

197 logger.error("Unknown on_change %r for event %s", on_change, event["id"]) 

198 return False 

199 

200 

201def _subscriber_identity(entity_uri: str, tenant_id: str) -> Identity: 

202 return Identity(entity_uri=entity_uri, permissions=["read"], tenant_id=tenant_id) 

203 

204 

205def _subscriber_has_active_key(entity_uri: str, tenant_id: str) -> bool: 

206 """Return True if at least one non-expired API key exists for this identity. 

207 

208 Only consulted when STIGMEM_AUTH_REQUIRED=true so that single-operator 

209 (auth-disabled) nodes are not affected. 

210 """ 

211 from . import settings as _sp 

212 

213 if not _sp.settings.auth_required: 213 ↛ 215line 213 didn't jump to line 215 because the condition on line 213 was always true

214 return True 

215 now = datetime.now(UTC).isoformat() 

216 with db() as conn: 

217 row = conn.execute( 

218 "SELECT id FROM api_keys WHERE entity_uri=? AND tenant_id=?" 

219 " AND (expires_at IS NULL OR expires_at > ?) LIMIT 1", 

220 (entity_uri, tenant_id, now), 

221 ).fetchone() 

222 return row is not None 

223 

224 

225def _sanitize_payload(event: Any, payload: dict[str, Any]) -> dict[str, Any] | None: 

226 """Apply §17 garden ACL and §19 sanitizer. Returns None to suppress delivery.""" 

227 from .models.facts import FactRecord, FactValue 

228 

229 subscriber = event["subscriber_identity"] 

230 tenant_id = event["tenant_id"] 

231 

232 # S2 fix: re-check that the subscriber still holds an active read credential. 

233 # Prevents continued delivery after API key revocation for scope/entity subscriptions. 

234 if not _subscriber_has_active_key(subscriber, tenant_id): 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 return None 

236 

237 # §17 garden ACL re-check: skip delivery if subscriber no longer a member 

238 garden_uuid = payload.get("garden_id") 

239 if garden_uuid and recall_filter_enabled(): 

240 role = get_member_role(garden_uuid, subscriber) 

241 if role is None: 241 ↛ 246line 241 didn't jump to line 246 because the condition on line 241 was always true

242 return None 

243 

244 # §23.3.3 r.2: drop event immediately when entity has an active tombstone. 

245 # Uses the in-process cache (§23.3.3 r.4); 60-second leak window is acceptable. 

246 entity_for_tombstone = payload.get("entity") 

247 if entity_for_tombstone and _is_tombstoned(entity_for_tombstone, tenant_id): 

248 return None 

249 

250 try: 

251 record = FactRecord( 

252 id=payload["id"], 

253 entity=payload["entity"], 

254 relation=payload["relation"], 

255 value=FactValue(type=payload["value_type"], v=payload.get("value_v")), 

256 source=payload["source"], 

257 timestamp=payload["timestamp"], 

258 confidence=float(payload.get("confidence", 1.0)), 

259 scope=payload.get("scope", "local"), 

260 ) 

261 identity = _subscriber_identity(subscriber, tenant_id) 

262 results = apply_recall_pipeline([record], identity=identity, include_low_trust=True) 

263 if not results: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 return None 

265 sanitized = results[0] 

266 if sanitized.sanitizer_redacted: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 return {"fact_id": payload["id"], "redacted": True} 

268 out = dict(payload) 

269 out["value_v"] = str(sanitized.value.v) if sanitized.value.v is not None else None 

270 if sanitized.sanitizer_warnings: 

271 out["sanitizer_warnings"] = sanitized.sanitizer_warnings 

272 return out 

273 except Exception as exc: 

274 logger.warning("Sanitizer error for fact %s: %s", payload.get("id"), exc) 

275 return payload 

276 

277 

278def _deliver_webhook(event: Any, payload: dict[str, Any]) -> bool: 

279 sanitized = _sanitize_payload(event, payload) 

280 if sanitized is None: 

281 # ACL/sanitizer blocked — mark delivered, don't retry 

282 _mark_delivered(event["id"], event["subscription_id"]) 

283 return True 

284 

285 body = { 

286 "event_id": event["id"], 

287 "idempotency_key": event["id"], 

288 "subscription_id": event["subscription_id"], 

289 "event_type": event["event_type"], 

290 "fact": sanitized, 

291 } 

292 

293 try: 

294 with httpx.Client(timeout=10.0) as client: 

295 resp = client.post( 

296 event["delivery_address"], 

297 json=body, 

298 headers={ 

299 "Content-Type": "application/json", 

300 "X-Stigmem-Event-Id": event["id"], 

301 }, 

302 ) 

303 

304 if resp.status_code == 410: 

305 # Webhook endpoint gone — cancel the subscription 

306 with db() as conn: 

307 conn.execute("DELETE FROM subscriptions WHERE id=?", (event["subscription_id"],)) 

308 logger.info("Subscription %s cancelled (410 Gone)", event["subscription_id"]) 

309 return True 

310 

311 # 5xx / 429 → retry; 2xx/3xx → success; 4xx (except 429) → permanent failure, don't retry 

312 if resp.status_code >= 500 or resp.status_code == 429: 

313 return False 

314 return resp.status_code < 400 

315 

316 except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError): 

317 return False 

318 

319 

320def _deliver_wake(event: Any, payload: dict[str, Any]) -> bool: 

321 sanitized = _sanitize_payload(event, payload) 

322 if sanitized is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 return True # ACL blocked; mark delivered 

324 

325 # P1 note: wake delivery writes sanitized fact payloads to stderr for 

326 # operator/platform pickup. Any process with stderr access sees ALL wake 

327 # events from ALL subscribers. This is intentional for the operator 

328 # integration use-case but means the platform operator is implicitly 

329 # trusted with the content of every wake-delivered fact. Operators that 

330 # need per-subscriber isolation should run one node per subscriber. 

331 print( 

332 json.dumps( 

333 { 

334 "stigmem_wake": { 

335 "event_id": event["id"], 

336 "subscription_id": event["subscription_id"], 

337 "subscriber_identity": event["subscriber_identity"], 

338 "delivery_address": event["delivery_address"], 

339 "event_type": event["event_type"], 

340 "fact": sanitized, 

341 "ts": datetime.now(UTC).isoformat(), 

342 } 

343 } 

344 ), 

345 file=sys.stderr, 

346 ) 

347 return True 

348 

349 

350def _record_result(event: Any, success: bool) -> None: 

351 now = datetime.now(UTC).isoformat() 

352 new_attempts = (event["delivery_attempts"] or 0) + 1 

353 

354 if success: 

355 with db() as conn: 

356 conn.execute( 

357 """UPDATE subscription_events 

358 SET delivered_at=?, delivery_status='delivered', 

359 delivery_attempts=?, claimed_at=NULL 

360 WHERE id=?""", 

361 (now, new_attempts, event["id"]), 

362 ) 

363 conn.execute( 

364 "UPDATE subscriptions SET last_delivered_at=?, consecutive_failures=0 WHERE id=?", 

365 (now, event["subscription_id"]), 

366 ) 

367 return 

368 

369 backoff_s = min(2**new_attempts, 300) 

370 next_retry = datetime.fromtimestamp(time.time() + backoff_s, UTC).isoformat() 

371 

372 with db() as conn: 

373 # Release the claim back to 'pending' so the next sweep can retry it. 

374 # Circuit-breaker logic below may override to 'failed'. 

375 conn.execute( 

376 """UPDATE subscription_events 

377 SET delivery_status='pending', claimed_at=NULL, 

378 delivery_attempts=?, next_retry_at=? 

379 WHERE id=?""", 

380 (new_attempts, next_retry, event["id"]), 

381 ) 

382 conn.execute( 

383 "UPDATE subscriptions SET consecutive_failures=consecutive_failures+1 WHERE id=?", 

384 (event["subscription_id"],), 

385 ) 

386 sub = conn.execute( 

387 "SELECT consecutive_failures FROM subscriptions WHERE id=?", 

388 (event["subscription_id"],), 

389 ).fetchone() 

390 threshold = _settings_pkg.settings.subscription_circuit_threshold 

391 if sub and sub["consecutive_failures"] >= threshold: 

392 conn.execute( 

393 "UPDATE subscriptions SET circuit_open=1 WHERE id=?", 

394 (event["subscription_id"],), 

395 ) 

396 conn.execute( 

397 "UPDATE subscription_events SET delivery_status='failed' WHERE id=?", 

398 (event["id"],), 

399 ) 

400 logger.warning("Circuit breaker opened for subscription %s", event["subscription_id"]) 

401 

402 

403def _mark_delivered(event_id: str, subscription_id: str) -> None: 

404 now = datetime.now(UTC).isoformat() 

405 with db() as conn: 

406 conn.execute( 

407 """UPDATE subscription_events 

408 SET delivered_at=?, 

409 delivery_status='delivered', 

410 delivery_attempts=delivery_attempts+1, 

411 claimed_at=NULL 

412 WHERE id=?""", 

413 (now, event_id), 

414 ) 

415 conn.execute( 

416 "UPDATE subscriptions SET last_delivered_at=?, consecutive_failures=0 WHERE id=?", 

417 (now, subscription_id), 

418 ) 

419 

420 

421# --------------------------------------------------------------------------- 

422# Background sweep loop 

423# --------------------------------------------------------------------------- 

424 

425 

426async def sweep_loop() -> None: 

427 """Long-running asyncio task: runs deliver_pending() every N seconds.""" 

428 while True: 

429 await asyncio.sleep(_settings_pkg.settings.subscription_delivery_sweep_s) 

430 try: 

431 await asyncio.to_thread(deliver_pending) 

432 except Exception as exc: 

433 logger.exception("subscription delivery sweep failed: %s", exc)