Coverage for node / src / stigmem_node / routes / subscriptions.py: 87%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Subscription CRUD and replay-window routes — spec §20.3, §20.5.
3POST /v1/subscriptions — create (subscriber_identity = caller; BOLA §20.3.2)
4GET /v1/subscriptions — list caller's subscriptions
5GET /v1/subscriptions/{id} — get one (BOLA: 404 if not owner)
6DELETE /v1/subscriptions/{id} — cancel (BOLA: 404 if not owner)
7GET /v1/subscriptions/{id}/events — replay window (§20.5)
8"""
10from __future__ import annotations
12import json
13import uuid
14from datetime import UTC, datetime, timedelta
15from typing import Annotated, Any
17from fastapi import APIRouter, Depends, HTTPException, Query, status
19from .. import settings as _settings_pkg
20from ..auth import Identity, resolve_identity
21from ..db import db
22from ..garden_acl import get_garden_by_garden_uri, require_garden_read
23from ..models.constants import VALID_SCOPES
24from ..models.subscriptions import (
25 SubscriptionCreateRequest,
26 SubscriptionEventRecord,
27 SubscriptionEventsResponse,
28 SubscriptionListResponse,
29 SubscriptionRecord,
30)
31from ..subscription_delivery import _sanitize_payload as _sanitize_event_payload
33router = APIRouter(prefix="/v1/subscriptions", tags=["subscriptions"])
36def _target_kind(target: str) -> str:
37 return "scope" if target in VALID_SCOPES else "entity"
40def _row_to_record(row: Any) -> SubscriptionRecord:
41 return SubscriptionRecord(
42 id=row["id"],
43 subscriber_identity=row["subscriber_identity"],
44 target=row["target"],
45 target_kind=row["target_kind"],
46 on_change=row["on_change"],
47 delivery_address=row["delivery_address"],
48 idempotency_key=row["idempotency_key"],
49 created_at=row["created_at"],
50 last_delivered_at=row["last_delivered_at"],
51 circuit_open=bool(row["circuit_open"]),
52 consecutive_failures=row["consecutive_failures"],
53 )
56def _event_row_to_record(row: Any) -> SubscriptionEventRecord:
57 return SubscriptionEventRecord(
58 id=row["id"],
59 subscription_id=row["subscription_id"],
60 event_type=row["event_type"],
61 entity_uri=row["entity_uri"],
62 fact_id=row["fact_id"],
63 payload=json.loads(row["payload"]),
64 created_at=row["created_at"],
65 delivered_at=row["delivered_at"],
66 delivery_status=row["delivery_status"],
67 delivery_attempts=row["delivery_attempts"],
68 )
71def _event_row_to_record_with_payload(row: Any, payload_json: str) -> SubscriptionEventRecord:
72 """Like _event_row_to_record but substitutes a pre-sanitized payload JSON string."""
73 return SubscriptionEventRecord(
74 id=row["id"],
75 subscription_id=row["subscription_id"],
76 event_type=row["event_type"],
77 entity_uri=row["entity_uri"],
78 fact_id=row["fact_id"],
79 payload=json.loads(payload_json),
80 created_at=row["created_at"],
81 delivered_at=row["delivered_at"],
82 delivery_status=row["delivery_status"],
83 delivery_attempts=row["delivery_attempts"],
84 )
87# ---------------------------------------------------------------------------
88# CRUD
89# ---------------------------------------------------------------------------
92@router.post("", response_model=SubscriptionRecord, status_code=status.HTTP_201_CREATED)
93def create_subscription(
94 req: SubscriptionCreateRequest,
95 identity: Annotated[Identity, Depends(resolve_identity)],
96) -> SubscriptionRecord:
97 """Create a subscription (Spec-X7-Subscriptions).
99 subscriber_identity is always the caller (BOLA).
100 """
101 if not identity.can_read(): 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 raise HTTPException(
103 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
104 )
106 # §17 garden ACL: if target is a garden URI, caller must be a member
107 if req.target.startswith("stigmem://") and "/garden/" in req.target: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 garden = get_garden_by_garden_uri(req.target, tenant_id=identity.tenant_id)
109 if garden is None:
110 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found")
111 require_garden_read(garden, identity)
113 # Idempotency key: return existing subscription if key matches THIS caller.
114 # Scoped to subscriber and tenant to prevent cross-entity metadata leakage (R2).
115 if req.idempotency_key:
116 with db() as conn:
117 existing = conn.execute(
118 """SELECT * FROM subscriptions
119 WHERE idempotency_key=? AND tenant_id=? AND subscriber_identity=?""",
120 (req.idempotency_key, identity.tenant_id, identity.entity_uri),
121 ).fetchone()
122 if existing is not None:
123 return _row_to_record(existing)
125 # Natural dedup: same (subscriber, target, on_change, delivery_address) → return existing
126 with db() as conn:
127 dupe = conn.execute(
128 """SELECT * FROM subscriptions
129 WHERE subscriber_identity=? AND target=? AND on_change=?
130 AND delivery_address=? AND tenant_id=?""",
131 (
132 identity.entity_uri,
133 req.target,
134 req.on_change,
135 req.delivery_address,
136 identity.tenant_id,
137 ),
138 ).fetchone()
139 if dupe is not None:
140 return _row_to_record(dupe)
142 sub_id = str(uuid.uuid4())
143 now = datetime.now(UTC).isoformat()
144 target_kind = _target_kind(req.target)
146 with db() as conn:
147 conn.execute(
148 """INSERT INTO subscriptions
149 (id, subscriber_identity, target, target_kind, on_change,
150 delivery_address, idempotency_key, created_at, tenant_id)
151 VALUES (?,?,?,?,?,?,?,?,?)""",
152 (
153 sub_id,
154 identity.entity_uri,
155 req.target,
156 target_kind,
157 req.on_change,
158 req.delivery_address,
159 req.idempotency_key,
160 now,
161 identity.tenant_id,
162 ),
163 )
164 row = conn.execute("SELECT * FROM subscriptions WHERE id=?", (sub_id,)).fetchone()
166 return _row_to_record(row)
169@router.get("", response_model=SubscriptionListResponse)
170def list_subscriptions(
171 identity: Annotated[Identity, Depends(resolve_identity)],
172) -> SubscriptionListResponse:
173 """List the authenticated caller's subscriptions (BOLA: own only)."""
174 if not identity.can_read(): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 raise HTTPException(
176 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
177 )
179 with db() as conn:
180 rows = conn.execute(
181 """SELECT * FROM subscriptions
182 WHERE subscriber_identity=? AND tenant_id=?
183 ORDER BY created_at DESC""",
184 (identity.entity_uri, identity.tenant_id),
185 ).fetchall()
187 return SubscriptionListResponse(
188 subscriptions=[_row_to_record(r) for r in rows],
189 total=len(rows),
190 )
193@router.get("/{subscription_id}", response_model=SubscriptionRecord)
194def get_subscription(
195 subscription_id: str,
196 identity: Annotated[Identity, Depends(resolve_identity)],
197) -> SubscriptionRecord:
198 """Get a subscription by ID (BOLA: returns 404 if caller does not own it)."""
199 if not identity.can_read(): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 raise HTTPException(
201 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
202 )
204 with db() as conn:
205 row = conn.execute(
206 "SELECT * FROM subscriptions WHERE id=? AND tenant_id=?",
207 (subscription_id, identity.tenant_id),
208 ).fetchone()
210 if row is None or row["subscriber_identity"] != identity.entity_uri:
211 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found")
213 return _row_to_record(row)
216@router.delete("/{subscription_id}", status_code=status.HTTP_204_NO_CONTENT)
217def delete_subscription(
218 subscription_id: str,
219 identity: Annotated[Identity, Depends(resolve_identity)],
220) -> None:
221 """Cancel a subscription (BOLA: only the owner may delete)."""
222 if not identity.can_read(): 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 raise HTTPException(
224 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
225 )
227 with db() as conn:
228 row = conn.execute(
229 "SELECT id, subscriber_identity FROM subscriptions WHERE id=? AND tenant_id=?",
230 (subscription_id, identity.tenant_id),
231 ).fetchone()
233 if row is None or row["subscriber_identity"] != identity.entity_uri:
234 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found")
236 with db() as conn:
237 conn.execute("DELETE FROM subscriptions WHERE id=?", (subscription_id,))
240# ---------------------------------------------------------------------------
241# Replay window
242# ---------------------------------------------------------------------------
245@router.get("/{subscription_id}/events", response_model=SubscriptionEventsResponse)
246def list_subscription_events(
247 subscription_id: str,
248 identity: Annotated[Identity, Depends(resolve_identity)],
249 since: str | None = Query(
250 None,
251 description="ISO 8601 timestamp; return events created at or after this time",
252 ),
253 cursor: str | None = Query(None, description="Opaque pagination cursor (event id)"),
254 limit: int = Query(50, ge=1, le=500),
255) -> SubscriptionEventsResponse:
256 """Replay window: return delivery events for a subscription (Spec-X7-Subscriptions).
258 Results are bounded to the configured replay window (default 24 h).
259 """
260 if not identity.can_read(): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 raise HTTPException(
262 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
263 )
265 with db() as conn:
266 sub_row = conn.execute(
267 "SELECT id, subscriber_identity FROM subscriptions WHERE id=? AND tenant_id=?",
268 (subscription_id, identity.tenant_id),
269 ).fetchone()
271 if sub_row is None or sub_row["subscriber_identity"] != identity.entity_uri:
272 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found")
274 replay_s = _settings_pkg.settings.subscription_replay_s
275 window_cutoff = (datetime.now(UTC) - timedelta(seconds=replay_s)).isoformat()
276 effective_since = max(since, window_cutoff) if since is not None else window_cutoff
278 if cursor:
279 sql = (
280 "SELECT rowid, * FROM subscription_events "
281 "WHERE subscription_id = ? AND created_at >= ? AND rowid > ? "
282 "ORDER BY rowid ASC LIMIT ?"
283 )
284 params: list[Any] = [subscription_id, effective_since, int(cursor)]
285 else:
286 sql = (
287 "SELECT rowid, * FROM subscription_events "
288 "WHERE subscription_id = ? AND created_at >= ? "
289 "ORDER BY rowid ASC LIMIT ?"
290 )
291 params = [subscription_id, effective_since]
292 params.append(limit + 1)
294 with db() as conn:
295 rows = conn.execute(sql, params).fetchall()
297 has_more = len(rows) > limit
298 rows = rows[:limit]
299 next_cursor = str(rows[-1]["rowid"]) if has_more and rows else None
301 # S3 fix: re-apply §17 garden ACL and §19 sanitizer at replay time.
302 # subscription_events.payload stores the raw pre-delivery payload; without this
303 # re-check a subscriber could retrieve garden-scoped facts they've since been
304 # removed from, or sanitizer-redacted content, via the replay window.
305 event_ctx = {
306 "subscriber_identity": sub_row["subscriber_identity"],
307 "tenant_id": identity.tenant_id,
308 }
309 safe_events: list[SubscriptionEventRecord] = []
310 for r in rows:
311 raw_payload = json.loads(r["payload"])
312 sanitized = _sanitize_event_payload(event_ctx, raw_payload)
313 if sanitized is None:
314 # ACL/sanitizer blocked — return redacted placeholder
315 safe_payload = json.dumps({"fact_id": raw_payload.get("id"), "redacted": True})
316 else:
317 safe_payload = json.dumps(sanitized)
318 safe_events.append(_event_row_to_record_with_payload(r, safe_payload))
320 return SubscriptionEventsResponse(
321 events=safe_events,
322 total=len(safe_events),
323 cursor=next_cursor,
324 )