Coverage for node / src / stigmem_node / subscription_delivery.py: 88%
160 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Subscription delivery engine — spec §20.4.
3Responsibilities:
41. ``fan_out`` — when a fact is written, find matching subscriptions and insert
5 pending ``subscription_events`` rows (fast; called synchronously on every write).
62. ``deliver_pending`` — sweep: attempt delivery for all pending/due-for-retry events.
73. ``sweep_loop`` — asyncio task run in the app lifespan; calls deliver_pending periodically.
9Delivery types:
10- ``webhook`` — HTTP POST to delivery_address with exponential backoff (1 s, 2 s, 4 s … ≤ 300 s).
11- ``wake`` — structured JSON emitted to stderr; platform-specific integration hook.
13Garden ACL (§17) and content sanitizer (§19) are applied at delivery time.
14"""
16from __future__ import annotations
18import asyncio
19import json
20import logging
21import sys
22import threading
23import time
24import uuid
25from datetime import UTC, datetime
26from typing import Any
28import httpx
30from . import settings as _settings_pkg
31from .auth import Identity
32from .db import db
33from .garden_acl import get_member_role
34from .lifecycle.tombstone_cache import is_tombstoned as _is_tombstoned
35from .memory_garden_acl_gate import recall_filter_enabled
36from .recall.recall_pipeline import apply_recall_pipeline
38logger = logging.getLogger("stigmem.subscriptions")
39_DELIVER_PENDING_LOCK = threading.Lock()
42# ---------------------------------------------------------------------------
43# Fan-out — called from routes/facts.py after each successful fact write
44# ---------------------------------------------------------------------------
47def fan_out(
48 fact_id: str,
49 entity: str,
50 scope: str,
51 garden_id: str | None,
52 tenant_id: str,
53 fact_payload_json: str,
54) -> None:
55 """Find subscriptions matching this fact and create pending delivery events."""
56 now = datetime.now(UTC).isoformat()
58 with db() as conn:
59 scope_subs = conn.execute(
60 """SELECT * FROM subscriptions
61 WHERE target_kind='scope' AND target=? AND tenant_id=? AND circuit_open=0""",
62 (scope, tenant_id),
63 ).fetchall()
65 entity_subs = conn.execute(
66 """SELECT * FROM subscriptions
67 WHERE target_kind='entity' AND target=? AND tenant_id=? AND circuit_open=0""",
68 (entity, tenant_id),
69 ).fetchall()
71 for sub in list(scope_subs) + list(entity_subs):
72 event_id = str(uuid.uuid4())
73 conn.execute(
74 """INSERT INTO subscription_events
75 (id, subscription_id, event_type, entity_uri, fact_id,
76 payload, created_at, delivery_status)
77 VALUES (?,?,?,?,?,?,?,'pending')""",
78 (event_id, sub["id"], "fact_asserted", entity, fact_id, fact_payload_json, now),
79 )
82# ---------------------------------------------------------------------------
83# Delivery sweep — called by sweep_loop every N seconds
84# ---------------------------------------------------------------------------
87def deliver_pending() -> None:
88 """Attempt delivery for all pending events that are due.
90 Uses an atomic claim (``pending → delivering``) so that the background
91 ``sweep_loop`` and any concurrent caller (admin replay endpoints,
92 tests, future per-tenant workers) cannot deliver the same event twice.
93 See issue #47 and migration 028 for the rationale.
95 Concurrency model
96 -----------------
97 1. Recover stale claims: any row in ``delivering`` with
98 ``claimed_at`` older than ``subscription_claim_timeout_s`` is reset
99 to ``pending`` so a crashed worker cannot strand events.
100 2. Atomically claim up to 100 due rows via
101 ``UPDATE … WHERE id IN (SELECT … LIMIT 100) RETURNING …`` — SQLite
102 serializes writes, so concurrent claimers see disjoint row sets.
103 3. For each claimed row, attempt delivery and transition to
104 ``delivered`` (success) or back to ``pending`` with a fresh
105 ``next_retry_at`` (failure).
106 """
107 if not _DELIVER_PENDING_LOCK.acquire(blocking=False):
108 logger.debug("subscription delivery already in progress; skipping concurrent drain")
109 return
111 try:
112 now = datetime.now(UTC).isoformat()
113 claim_timeout_s = _settings_pkg.settings.subscription_claim_timeout_s
114 stale_cutoff = datetime.fromtimestamp(
115 time.time() - claim_timeout_s,
116 UTC,
117 ).isoformat()
119 with db() as conn:
120 # 1. Recover stale claims left behind by a crashed worker. We do NOT
121 # reset delivery_attempts — the next attempt counts as a retry.
122 conn.execute(
123 """UPDATE subscription_events
124 SET delivery_status='pending', claimed_at=NULL
125 WHERE delivery_status='delivering'
126 AND claimed_at IS NOT NULL
127 AND claimed_at < ?""",
128 (stale_cutoff,),
129 )
131 # 2. Atomic claim. The inner SELECT is the same predicate the old
132 # non-atomic path used, joined to subscriptions for circuit-open
133 # filtering. The outer UPDATE … RETURNING gives us the claimed
134 # rows in a single round-trip; no other caller can claim them
135 # until we release them.
136 claimed = conn.execute(
137 """UPDATE subscription_events
138 SET delivery_status='delivering', claimed_at=?
139 WHERE id IN (
140 SELECT e.id
141 FROM subscription_events e
142 JOIN subscriptions s ON e.subscription_id = s.id
143 WHERE e.delivery_status = 'pending'
144 AND s.circuit_open = 0
145 AND (e.next_retry_at IS NULL OR e.next_retry_at <= ?)
146 ORDER BY e.created_at ASC
147 LIMIT 100
148 )
149 RETURNING id""",
150 (now, now),
151 ).fetchall()
153 if not claimed:
154 return
156 claimed_ids = [row["id"] for row in claimed]
157 # ``placeholders`` is a fixed "?,?,?…" string whose length comes from
158 # ``claimed_ids`` (UUIDs we just emitted into our own table) — no user
159 # input flows into the SQL text, so the f-string interpolation is safe.
160 placeholders = ",".join("?" * len(claimed_ids))
161 _base = (
162 "SELECT e.id, e.subscription_id, e.event_type, e.entity_uri, e.fact_id,"
163 " e.payload, e.created_at, e.delivery_attempts,"
164 " s.on_change, s.delivery_address, s.subscriber_identity, s.tenant_id,"
165 " s.circuit_open"
166 " FROM subscription_events e"
167 " JOIN subscriptions s ON e.subscription_id = s.id"
168 " WHERE e.id IN ("
169 )
170 select_sql = _base + placeholders + ")" # noqa: S608 — fixed "?,…" string
171 events = conn.execute(select_sql, claimed_ids).fetchall()
173 for event in events:
174 try:
175 payload = json.loads(event["payload"])
176 success = _deliver_one(event, payload)
177 except Exception as exc:
178 logger.error("Delivery error for event %s: %s", event["id"], exc)
179 success = False
181 _record_result(event, success)
182 finally:
183 _DELIVER_PENDING_LOCK.release()
186# ---------------------------------------------------------------------------
187# Delivery helpers
188# ---------------------------------------------------------------------------
191def _deliver_one(event: Any, payload: dict[str, Any]) -> bool:
192 on_change = event["on_change"]
193 if on_change == "webhook":
194 return _deliver_webhook(event, payload)
195 if on_change == "wake": 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true
196 return _deliver_wake(event, payload)
197 logger.error("Unknown on_change %r for event %s", on_change, event["id"])
198 return False
201def _subscriber_identity(entity_uri: str, tenant_id: str) -> Identity:
202 return Identity(entity_uri=entity_uri, permissions=["read"], tenant_id=tenant_id)
205def _subscriber_has_active_key(entity_uri: str, tenant_id: str) -> bool:
206 """Return True if at least one non-expired API key exists for this identity.
208 Only consulted when STIGMEM_AUTH_REQUIRED=true so that single-operator
209 (auth-disabled) nodes are not affected.
210 """
211 from . import settings as _sp
213 if not _sp.settings.auth_required: 213 ↛ 215line 213 didn't jump to line 215 because the condition on line 213 was always true
214 return True
215 now = datetime.now(UTC).isoformat()
216 with db() as conn:
217 row = conn.execute(
218 "SELECT id FROM api_keys WHERE entity_uri=? AND tenant_id=?"
219 " AND (expires_at IS NULL OR expires_at > ?) LIMIT 1",
220 (entity_uri, tenant_id, now),
221 ).fetchone()
222 return row is not None
225def _sanitize_payload(event: Any, payload: dict[str, Any]) -> dict[str, Any] | None:
226 """Apply §17 garden ACL and §19 sanitizer. Returns None to suppress delivery."""
227 from .models.facts import FactRecord, FactValue
229 subscriber = event["subscriber_identity"]
230 tenant_id = event["tenant_id"]
232 # S2 fix: re-check that the subscriber still holds an active read credential.
233 # Prevents continued delivery after API key revocation for scope/entity subscriptions.
234 if not _subscriber_has_active_key(subscriber, tenant_id): 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 return None
237 # §17 garden ACL re-check: skip delivery if subscriber no longer a member
238 garden_uuid = payload.get("garden_id")
239 if garden_uuid and recall_filter_enabled():
240 role = get_member_role(garden_uuid, subscriber)
241 if role is None: 241 ↛ 246line 241 didn't jump to line 246 because the condition on line 241 was always true
242 return None
244 # §23.3.3 r.2: drop event immediately when entity has an active tombstone.
245 # Uses the in-process cache (§23.3.3 r.4); 60-second leak window is acceptable.
246 entity_for_tombstone = payload.get("entity")
247 if entity_for_tombstone and _is_tombstoned(entity_for_tombstone, tenant_id):
248 return None
250 try:
251 record = FactRecord(
252 id=payload["id"],
253 entity=payload["entity"],
254 relation=payload["relation"],
255 value=FactValue(type=payload["value_type"], v=payload.get("value_v")),
256 source=payload["source"],
257 timestamp=payload["timestamp"],
258 confidence=float(payload.get("confidence", 1.0)),
259 scope=payload.get("scope", "local"),
260 )
261 identity = _subscriber_identity(subscriber, tenant_id)
262 results = apply_recall_pipeline([record], identity=identity, include_low_trust=True)
263 if not results: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 return None
265 sanitized = results[0]
266 if sanitized.sanitizer_redacted: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 return {"fact_id": payload["id"], "redacted": True}
268 out = dict(payload)
269 out["value_v"] = str(sanitized.value.v) if sanitized.value.v is not None else None
270 if sanitized.sanitizer_warnings:
271 out["sanitizer_warnings"] = sanitized.sanitizer_warnings
272 return out
273 except Exception as exc:
274 logger.warning("Sanitizer error for fact %s: %s", payload.get("id"), exc)
275 return payload
278def _deliver_webhook(event: Any, payload: dict[str, Any]) -> bool:
279 sanitized = _sanitize_payload(event, payload)
280 if sanitized is None:
281 # ACL/sanitizer blocked — mark delivered, don't retry
282 _mark_delivered(event["id"], event["subscription_id"])
283 return True
285 body = {
286 "event_id": event["id"],
287 "idempotency_key": event["id"],
288 "subscription_id": event["subscription_id"],
289 "event_type": event["event_type"],
290 "fact": sanitized,
291 }
293 try:
294 with httpx.Client(timeout=10.0) as client:
295 resp = client.post(
296 event["delivery_address"],
297 json=body,
298 headers={
299 "Content-Type": "application/json",
300 "X-Stigmem-Event-Id": event["id"],
301 },
302 )
304 if resp.status_code == 410:
305 # Webhook endpoint gone — cancel the subscription
306 with db() as conn:
307 conn.execute("DELETE FROM subscriptions WHERE id=?", (event["subscription_id"],))
308 logger.info("Subscription %s cancelled (410 Gone)", event["subscription_id"])
309 return True
311 # 5xx / 429 → retry; 2xx/3xx → success; 4xx (except 429) → permanent failure, don't retry
312 if resp.status_code >= 500 or resp.status_code == 429:
313 return False
314 return resp.status_code < 400
316 except (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError):
317 return False
320def _deliver_wake(event: Any, payload: dict[str, Any]) -> bool:
321 sanitized = _sanitize_payload(event, payload)
322 if sanitized is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 return True # ACL blocked; mark delivered
325 # P1 note: wake delivery writes sanitized fact payloads to stderr for
326 # operator/platform pickup. Any process with stderr access sees ALL wake
327 # events from ALL subscribers. This is intentional for the operator
328 # integration use-case but means the platform operator is implicitly
329 # trusted with the content of every wake-delivered fact. Operators that
330 # need per-subscriber isolation should run one node per subscriber.
331 print(
332 json.dumps(
333 {
334 "stigmem_wake": {
335 "event_id": event["id"],
336 "subscription_id": event["subscription_id"],
337 "subscriber_identity": event["subscriber_identity"],
338 "delivery_address": event["delivery_address"],
339 "event_type": event["event_type"],
340 "fact": sanitized,
341 "ts": datetime.now(UTC).isoformat(),
342 }
343 }
344 ),
345 file=sys.stderr,
346 )
347 return True
350def _record_result(event: Any, success: bool) -> None:
351 now = datetime.now(UTC).isoformat()
352 new_attempts = (event["delivery_attempts"] or 0) + 1
354 if success:
355 with db() as conn:
356 conn.execute(
357 """UPDATE subscription_events
358 SET delivered_at=?, delivery_status='delivered',
359 delivery_attempts=?, claimed_at=NULL
360 WHERE id=?""",
361 (now, new_attempts, event["id"]),
362 )
363 conn.execute(
364 "UPDATE subscriptions SET last_delivered_at=?, consecutive_failures=0 WHERE id=?",
365 (now, event["subscription_id"]),
366 )
367 return
369 backoff_s = min(2**new_attempts, 300)
370 next_retry = datetime.fromtimestamp(time.time() + backoff_s, UTC).isoformat()
372 with db() as conn:
373 # Release the claim back to 'pending' so the next sweep can retry it.
374 # Circuit-breaker logic below may override to 'failed'.
375 conn.execute(
376 """UPDATE subscription_events
377 SET delivery_status='pending', claimed_at=NULL,
378 delivery_attempts=?, next_retry_at=?
379 WHERE id=?""",
380 (new_attempts, next_retry, event["id"]),
381 )
382 conn.execute(
383 "UPDATE subscriptions SET consecutive_failures=consecutive_failures+1 WHERE id=?",
384 (event["subscription_id"],),
385 )
386 sub = conn.execute(
387 "SELECT consecutive_failures FROM subscriptions WHERE id=?",
388 (event["subscription_id"],),
389 ).fetchone()
390 threshold = _settings_pkg.settings.subscription_circuit_threshold
391 if sub and sub["consecutive_failures"] >= threshold:
392 conn.execute(
393 "UPDATE subscriptions SET circuit_open=1 WHERE id=?",
394 (event["subscription_id"],),
395 )
396 conn.execute(
397 "UPDATE subscription_events SET delivery_status='failed' WHERE id=?",
398 (event["id"],),
399 )
400 logger.warning("Circuit breaker opened for subscription %s", event["subscription_id"])
403def _mark_delivered(event_id: str, subscription_id: str) -> None:
404 now = datetime.now(UTC).isoformat()
405 with db() as conn:
406 conn.execute(
407 """UPDATE subscription_events
408 SET delivered_at=?,
409 delivery_status='delivered',
410 delivery_attempts=delivery_attempts+1,
411 claimed_at=NULL
412 WHERE id=?""",
413 (now, event_id),
414 )
415 conn.execute(
416 "UPDATE subscriptions SET last_delivered_at=?, consecutive_failures=0 WHERE id=?",
417 (now, subscription_id),
418 )
421# ---------------------------------------------------------------------------
422# Background sweep loop
423# ---------------------------------------------------------------------------
426async def sweep_loop() -> None:
427 """Long-running asyncio task: runs deliver_pending() every N seconds."""
428 while True:
429 await asyncio.sleep(_settings_pkg.settings.subscription_delivery_sweep_s)
430 try:
431 await asyncio.to_thread(deliver_pending)
432 except Exception as exc:
433 logger.exception("subscription delivery sweep failed: %s", exc)