Coverage for node / src / stigmem_node / routes / subscriptions.py: 87%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Subscription CRUD and replay-window routes — spec §20.3, §20.5. 

2 

3POST /v1/subscriptions — create (subscriber_identity = caller; BOLA §20.3.2) 

4GET /v1/subscriptions — list caller's subscriptions 

5GET /v1/subscriptions/{id} — get one (BOLA: 404 if not owner) 

6DELETE /v1/subscriptions/{id} — cancel (BOLA: 404 if not owner) 

7GET /v1/subscriptions/{id}/events — replay window (§20.5) 

8""" 

9 

10from __future__ import annotations 

11 

12import json 

13import uuid 

14from datetime import UTC, datetime, timedelta 

15from typing import Annotated, Any 

16 

17from fastapi import APIRouter, Depends, HTTPException, Query, status 

18 

19from .. import settings as _settings_pkg 

20from ..auth import Identity, resolve_identity 

21from ..db import db 

22from ..garden_acl import get_garden_by_garden_uri, require_garden_read 

23from ..models.constants import VALID_SCOPES 

24from ..models.subscriptions import ( 

25 SubscriptionCreateRequest, 

26 SubscriptionEventRecord, 

27 SubscriptionEventsResponse, 

28 SubscriptionListResponse, 

29 SubscriptionRecord, 

30) 

31from ..subscription_delivery import _sanitize_payload as _sanitize_event_payload 

32 

33router = APIRouter(prefix="/v1/subscriptions", tags=["subscriptions"]) 

34 

35 

36def _target_kind(target: str) -> str: 

37 return "scope" if target in VALID_SCOPES else "entity" 

38 

39 

40def _row_to_record(row: Any) -> SubscriptionRecord: 

41 return SubscriptionRecord( 

42 id=row["id"], 

43 subscriber_identity=row["subscriber_identity"], 

44 target=row["target"], 

45 target_kind=row["target_kind"], 

46 on_change=row["on_change"], 

47 delivery_address=row["delivery_address"], 

48 idempotency_key=row["idempotency_key"], 

49 created_at=row["created_at"], 

50 last_delivered_at=row["last_delivered_at"], 

51 circuit_open=bool(row["circuit_open"]), 

52 consecutive_failures=row["consecutive_failures"], 

53 ) 

54 

55 

56def _event_row_to_record(row: Any) -> SubscriptionEventRecord: 

57 return SubscriptionEventRecord( 

58 id=row["id"], 

59 subscription_id=row["subscription_id"], 

60 event_type=row["event_type"], 

61 entity_uri=row["entity_uri"], 

62 fact_id=row["fact_id"], 

63 payload=json.loads(row["payload"]), 

64 created_at=row["created_at"], 

65 delivered_at=row["delivered_at"], 

66 delivery_status=row["delivery_status"], 

67 delivery_attempts=row["delivery_attempts"], 

68 ) 

69 

70 

71def _event_row_to_record_with_payload(row: Any, payload_json: str) -> SubscriptionEventRecord: 

72 """Like _event_row_to_record but substitutes a pre-sanitized payload JSON string.""" 

73 return SubscriptionEventRecord( 

74 id=row["id"], 

75 subscription_id=row["subscription_id"], 

76 event_type=row["event_type"], 

77 entity_uri=row["entity_uri"], 

78 fact_id=row["fact_id"], 

79 payload=json.loads(payload_json), 

80 created_at=row["created_at"], 

81 delivered_at=row["delivered_at"], 

82 delivery_status=row["delivery_status"], 

83 delivery_attempts=row["delivery_attempts"], 

84 ) 

85 

86 

87# --------------------------------------------------------------------------- 

88# CRUD 

89# --------------------------------------------------------------------------- 

90 

91 

92@router.post("", response_model=SubscriptionRecord, status_code=status.HTTP_201_CREATED) 

93def create_subscription( 

94 req: SubscriptionCreateRequest, 

95 identity: Annotated[Identity, Depends(resolve_identity)], 

96) -> SubscriptionRecord: 

97 """Create a subscription (Spec-X7-Subscriptions). 

98 

99 subscriber_identity is always the caller (BOLA). 

100 """ 

101 if not identity.can_read(): 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 raise HTTPException( 

103 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

104 ) 

105 

106 # §17 garden ACL: if target is a garden URI, caller must be a member 

107 if req.target.startswith("stigmem://") and "/garden/" in req.target: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 garden = get_garden_by_garden_uri(req.target, tenant_id=identity.tenant_id) 

109 if garden is None: 

110 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found") 

111 require_garden_read(garden, identity) 

112 

113 # Idempotency key: return existing subscription if key matches THIS caller. 

114 # Scoped to subscriber and tenant to prevent cross-entity metadata leakage (R2). 

115 if req.idempotency_key: 

116 with db() as conn: 

117 existing = conn.execute( 

118 """SELECT * FROM subscriptions 

119 WHERE idempotency_key=? AND tenant_id=? AND subscriber_identity=?""", 

120 (req.idempotency_key, identity.tenant_id, identity.entity_uri), 

121 ).fetchone() 

122 if existing is not None: 

123 return _row_to_record(existing) 

124 

125 # Natural dedup: same (subscriber, target, on_change, delivery_address) → return existing 

126 with db() as conn: 

127 dupe = conn.execute( 

128 """SELECT * FROM subscriptions 

129 WHERE subscriber_identity=? AND target=? AND on_change=? 

130 AND delivery_address=? AND tenant_id=?""", 

131 ( 

132 identity.entity_uri, 

133 req.target, 

134 req.on_change, 

135 req.delivery_address, 

136 identity.tenant_id, 

137 ), 

138 ).fetchone() 

139 if dupe is not None: 

140 return _row_to_record(dupe) 

141 

142 sub_id = str(uuid.uuid4()) 

143 now = datetime.now(UTC).isoformat() 

144 target_kind = _target_kind(req.target) 

145 

146 with db() as conn: 

147 conn.execute( 

148 """INSERT INTO subscriptions 

149 (id, subscriber_identity, target, target_kind, on_change, 

150 delivery_address, idempotency_key, created_at, tenant_id) 

151 VALUES (?,?,?,?,?,?,?,?,?)""", 

152 ( 

153 sub_id, 

154 identity.entity_uri, 

155 req.target, 

156 target_kind, 

157 req.on_change, 

158 req.delivery_address, 

159 req.idempotency_key, 

160 now, 

161 identity.tenant_id, 

162 ), 

163 ) 

164 row = conn.execute("SELECT * FROM subscriptions WHERE id=?", (sub_id,)).fetchone() 

165 

166 return _row_to_record(row) 

167 

168 

169@router.get("", response_model=SubscriptionListResponse) 

170def list_subscriptions( 

171 identity: Annotated[Identity, Depends(resolve_identity)], 

172) -> SubscriptionListResponse: 

173 """List the authenticated caller's subscriptions (BOLA: own only).""" 

174 if not identity.can_read(): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 raise HTTPException( 

176 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

177 ) 

178 

179 with db() as conn: 

180 rows = conn.execute( 

181 """SELECT * FROM subscriptions 

182 WHERE subscriber_identity=? AND tenant_id=? 

183 ORDER BY created_at DESC""", 

184 (identity.entity_uri, identity.tenant_id), 

185 ).fetchall() 

186 

187 return SubscriptionListResponse( 

188 subscriptions=[_row_to_record(r) for r in rows], 

189 total=len(rows), 

190 ) 

191 

192 

193@router.get("/{subscription_id}", response_model=SubscriptionRecord) 

194def get_subscription( 

195 subscription_id: str, 

196 identity: Annotated[Identity, Depends(resolve_identity)], 

197) -> SubscriptionRecord: 

198 """Get a subscription by ID (BOLA: returns 404 if caller does not own it).""" 

199 if not identity.can_read(): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise HTTPException( 

201 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

202 ) 

203 

204 with db() as conn: 

205 row = conn.execute( 

206 "SELECT * FROM subscriptions WHERE id=? AND tenant_id=?", 

207 (subscription_id, identity.tenant_id), 

208 ).fetchone() 

209 

210 if row is None or row["subscriber_identity"] != identity.entity_uri: 

211 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found") 

212 

213 return _row_to_record(row) 

214 

215 

216@router.delete("/{subscription_id}", status_code=status.HTTP_204_NO_CONTENT) 

217def delete_subscription( 

218 subscription_id: str, 

219 identity: Annotated[Identity, Depends(resolve_identity)], 

220) -> None: 

221 """Cancel a subscription (BOLA: only the owner may delete).""" 

222 if not identity.can_read(): 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 raise HTTPException( 

224 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

225 ) 

226 

227 with db() as conn: 

228 row = conn.execute( 

229 "SELECT id, subscriber_identity FROM subscriptions WHERE id=? AND tenant_id=?", 

230 (subscription_id, identity.tenant_id), 

231 ).fetchone() 

232 

233 if row is None or row["subscriber_identity"] != identity.entity_uri: 

234 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found") 

235 

236 with db() as conn: 

237 conn.execute("DELETE FROM subscriptions WHERE id=?", (subscription_id,)) 

238 

239 

240# --------------------------------------------------------------------------- 

241# Replay window 

242# --------------------------------------------------------------------------- 

243 

244 

245@router.get("/{subscription_id}/events", response_model=SubscriptionEventsResponse) 

246def list_subscription_events( 

247 subscription_id: str, 

248 identity: Annotated[Identity, Depends(resolve_identity)], 

249 since: str | None = Query( 

250 None, 

251 description="ISO 8601 timestamp; return events created at or after this time", 

252 ), 

253 cursor: str | None = Query(None, description="Opaque pagination cursor (event id)"), 

254 limit: int = Query(50, ge=1, le=500), 

255) -> SubscriptionEventsResponse: 

256 """Replay window: return delivery events for a subscription (Spec-X7-Subscriptions). 

257 

258 Results are bounded to the configured replay window (default 24 h). 

259 """ 

260 if not identity.can_read(): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 raise HTTPException( 

262 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

263 ) 

264 

265 with db() as conn: 

266 sub_row = conn.execute( 

267 "SELECT id, subscriber_identity FROM subscriptions WHERE id=? AND tenant_id=?", 

268 (subscription_id, identity.tenant_id), 

269 ).fetchone() 

270 

271 if sub_row is None or sub_row["subscriber_identity"] != identity.entity_uri: 

272 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="subscription not found") 

273 

274 replay_s = _settings_pkg.settings.subscription_replay_s 

275 window_cutoff = (datetime.now(UTC) - timedelta(seconds=replay_s)).isoformat() 

276 effective_since = max(since, window_cutoff) if since is not None else window_cutoff 

277 

278 if cursor: 

279 sql = ( 

280 "SELECT rowid, * FROM subscription_events " 

281 "WHERE subscription_id = ? AND created_at >= ? AND rowid > ? " 

282 "ORDER BY rowid ASC LIMIT ?" 

283 ) 

284 params: list[Any] = [subscription_id, effective_since, int(cursor)] 

285 else: 

286 sql = ( 

287 "SELECT rowid, * FROM subscription_events " 

288 "WHERE subscription_id = ? AND created_at >= ? " 

289 "ORDER BY rowid ASC LIMIT ?" 

290 ) 

291 params = [subscription_id, effective_since] 

292 params.append(limit + 1) 

293 

294 with db() as conn: 

295 rows = conn.execute(sql, params).fetchall() 

296 

297 has_more = len(rows) > limit 

298 rows = rows[:limit] 

299 next_cursor = str(rows[-1]["rowid"]) if has_more and rows else None 

300 

301 # S3 fix: re-apply §17 garden ACL and §19 sanitizer at replay time. 

302 # subscription_events.payload stores the raw pre-delivery payload; without this 

303 # re-check a subscriber could retrieve garden-scoped facts they've since been 

304 # removed from, or sanitizer-redacted content, via the replay window. 

305 event_ctx = { 

306 "subscriber_identity": sub_row["subscriber_identity"], 

307 "tenant_id": identity.tenant_id, 

308 } 

309 safe_events: list[SubscriptionEventRecord] = [] 

310 for r in rows: 

311 raw_payload = json.loads(r["payload"]) 

312 sanitized = _sanitize_event_payload(event_ctx, raw_payload) 

313 if sanitized is None: 

314 # ACL/sanitizer blocked — return redacted placeholder 

315 safe_payload = json.dumps({"fact_id": raw_payload.get("id"), "redacted": True}) 

316 else: 

317 safe_payload = json.dumps(sanitized) 

318 safe_events.append(_event_row_to_record_with_payload(r, safe_payload)) 

319 

320 return SubscriptionEventsResponse( 

321 events=safe_events, 

322 total=len(safe_events), 

323 cursor=next_cursor, 

324 )