Coverage for node / src / stigmem_node / routes / quarantine.py: 80%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Quarantine admin API — spec §19.5. 

2 

3GET /v1/quarantine — list quarantined facts (all or by garden) 

4POST /v1/quarantine/{fact_id}/admit — shorthand: promote fact to main fabric 

5POST /v1/quarantine/{fact_id}/reject — shorthand: reject a quarantined fact 

6 

7These endpoints are convenience wrappers over the garden-level promote/reject 

8endpoints (POST /v1/gardens/:id/promote|reject). They operate node-globally: 

9the caller addresses a fact by ID without needing to know its quarantine garden. 

10Requires node admin (write) permission. 

11""" 

12 

13from __future__ import annotations 

14 

15import uuid 

16from datetime import UTC, datetime 

17from typing import Annotated, Any 

18 

19from fastapi import APIRouter, Depends, HTTPException, Query, status 

20 

21from ..audit_event import INSTRUCTION_PROMOTED, emit_instruction_event_if_applicable 

22from ..auth import Identity, resolve_identity 

23from ..db import db 

24from ..garden_acl import get_garden_by_slug_or_id, require_quarantine_moderator_or_admin 

25from ..lifecycle.immutability import ( 

26 set_fact_garden_membership, 

27 set_fact_quarantine_status, 

28 set_fact_validity_override, 

29) 

30from ..models.constants import QUARANTINE_PENDING 

31from ..models.gardens import ( 

32 QuarantineListResponse, 

33 QuarantineRecord, 

34) 

35 

36router = APIRouter(prefix="/v1/quarantine", tags=["quarantine"]) 

37 

38 

39def _require_write(identity: Identity) -> None: 

40 if not identity.can_write(): 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 raise HTTPException( 

42 status_code=status.HTTP_403_FORBIDDEN, detail="write permission required" 

43 ) 

44 

45 

46# --------------------------------------------------------------------------- 

47# List quarantined facts 

48# --------------------------------------------------------------------------- 

49 

50 

51@router.get("", response_model=QuarantineListResponse) 

52def list_quarantined_facts( 

53 identity: Annotated[Identity, Depends(resolve_identity)], 

54 garden_id: str | None = Query(None, description="Filter by quarantine garden UUID or slug"), 

55 quarantine_status: str | None = Query( 

56 None, description="Filter by status: pending, promoted, rejected" 

57 ), 

58 limit: int = Query(100, ge=1, le=1000), 

59 offset: int = Query(0, ge=0), 

60) -> QuarantineListResponse: 

61 """List facts in the quarantine system (Spec-08-Quarantine-Garden). 

62 

63 Node admins see all quarantined facts across all gardens. 

64 Other callers see facts only in quarantine gardens where they hold a member role. 

65 """ 

66 if not identity.can_read(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 raise HTTPException( 

68 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

69 ) 

70 

71 projection_joins = ( 

72 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id" 

73 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id" 

74 ) 

75 quarantine_garden_expr = "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id)" 

76 quarantine_status_expr = "COALESCE(fqs.quarantine_status, f.quarantine_status)" 

77 filters: list[str] = [f"{quarantine_garden_expr} IS NOT NULL"] 

78 params: list[Any] = [] 

79 

80 if quarantine_status: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 filters.append(f"{quarantine_status_expr} = ?") 

82 params.append(quarantine_status) 

83 else: 

84 filters.append(f"{quarantine_status_expr} IS NOT NULL") 

85 

86 if garden_id: 

87 # Resolve slug to UUID 

88 garden = get_garden_by_slug_or_id(garden_id, tenant_id=identity.tenant_id) 

89 if garden is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found") 

91 filters.append(f"{quarantine_garden_expr} = ?") 

92 params.append(garden["id"]) 

93 elif not identity.can_write(): 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true

94 # Non-admins: only see facts in gardens they're members of 

95 filters.append( 

96 f"{quarantine_garden_expr} IN (" # nosec B608 

97 " SELECT gm.garden_id FROM garden_members gm" 

98 " WHERE gm.entity_uri = ?" 

99 ")" 

100 ) 

101 params.append(identity.entity_uri) 

102 

103 where_clause = " AND ".join(filters) 

104 

105 with db() as conn: 

106 count_row = conn.execute( 

107 f"SELECT COUNT(*) FROM facts f {projection_joins} WHERE {where_clause}", # nosec B608 

108 params, 

109 ).fetchone() 

110 total: int = count_row[0] if count_row else 0 

111 

112 rows = conn.execute( 

113 f"""SELECT f.id, f.entity, f.relation, f.source, 

114 {quarantine_status_expr} AS quarantine_status, 

115 {quarantine_garden_expr} AS quarantine_garden_id, 

116 COALESCE(fqs.quarantine_reason, f.quarantine_reason) AS quarantine_reason, 

117 COALESCE(fqs.quarantine_acted_by, f.quarantine_acted_by) 

118 AS quarantine_acted_by, 

119 COALESCE(fqs.quarantine_acted_at, f.quarantine_acted_at) 

120 AS quarantine_acted_at, 

121 f.source_trust, f.received_from, f.timestamp 

122 FROM facts f {projection_joins} 

123 WHERE {where_clause} 

124 ORDER BY f.timestamp DESC 

125 LIMIT ? OFFSET ?""", # nosec B608 

126 [*params, limit, offset], 

127 ).fetchall() 

128 

129 items = [ 

130 QuarantineRecord( 

131 fact_id=r["id"], 

132 entity=r["entity"], 

133 relation=r["relation"], 

134 source=r["source"], 

135 quarantine_status=r["quarantine_status"] or "", 

136 quarantine_garden_id=r["quarantine_garden_id"], 

137 quarantine_reason=r["quarantine_reason"], 

138 quarantine_acted_by=r["quarantine_acted_by"], 

139 quarantine_acted_at=r["quarantine_acted_at"], 

140 source_trust=float(r["source_trust"]) if r["source_trust"] is not None else None, 

141 received_from=r["received_from"], 

142 timestamp=r["timestamp"], 

143 ) 

144 for r in rows 

145 ] 

146 

147 return QuarantineListResponse(items=items, total=total) 

148 

149 

150# --------------------------------------------------------------------------- 

151# Admit (promote) a fact from quarantine to the main fabric 

152# --------------------------------------------------------------------------- 

153 

154 

155@router.post("/{fact_id}/admit", status_code=status.HTTP_200_OK) 

156def admit_fact( 

157 fact_id: str, 

158 identity: Annotated[Identity, Depends(resolve_identity)], 

159 target_garden_id: str | None = Query(None, description="Target garden UUID or slug"), 

160 reason: str = Query("", description="Reason for admission"), 

161) -> dict[str, Any]: 

162 """Promote a quarantined fact to the main fabric (or a specific target garden). 

163 

164 Node admins may admit quarantined facts as last-resort moderation 

165 authority. Other callers require quarantine:moderator or admin role in the 

166 fact's quarantine garden. 

167 """ 

168 _require_write(identity) 

169 

170 fact_row, garden = _get_quarantined_fact(fact_id, identity) 

171 now = datetime.now(UTC).isoformat() 

172 

173 # Resolve target garden 

174 target_db_id: str | None = None 

175 if target_garden_id: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 tg = get_garden_by_slug_or_id(target_garden_id, tenant_id=identity.tenant_id) 

177 if tg is None: 

178 raise HTTPException( 

179 status_code=status.HTTP_404_NOT_FOUND, detail="target garden not found" 

180 ) 

181 target_db_id = tg["id"] 

182 

183 with db() as conn: 

184 set_fact_garden_membership( 

185 conn, 

186 fact_id=fact_id, 

187 garden_id=target_db_id, 

188 updated_by=identity.entity_uri, 

189 ) 

190 set_fact_quarantine_status( 

191 conn, 

192 fact_id=fact_id, 

193 quarantine_garden_id=garden["id"], 

194 quarantine_status="promoted", 

195 quarantine_reason=reason or "admitted via admin API", 

196 quarantine_acted_by=identity.entity_uri, 

197 quarantine_acted_at=now, 

198 ) 

199 _write_quarantine_audit(conn, fact_id, "quarantine_promote", identity, now) 

200 emit_instruction_event_if_applicable( 

201 INSTRUCTION_PROMOTED, 

202 fact_id=fact_id, 

203 fact_entity=fact_row["entity"], 

204 fact_relation=fact_row["relation"], 

205 fact_interpret_as=fact_row["interpret_as"], 

206 actor_uri=identity.entity_uri, 

207 tenant_id=identity.tenant_id, 

208 oidc_sub=identity.oidc_sub, 

209 source=identity.entity_uri, 

210 detail={ 

211 "reason": reason or "admitted via admin API", 

212 "quarantine_garden_id": garden["id"], 

213 "target_garden_id": target_db_id, 

214 }, 

215 conn=conn, 

216 ) 

217 

218 return { 

219 "fact_id": fact_id, 

220 "action": "admitted", 

221 "target_garden_id": target_db_id, 

222 "acted_by": identity.entity_uri, 

223 "acted_at": now, 

224 } 

225 

226 

227# --------------------------------------------------------------------------- 

228# Reject a quarantined fact 

229# --------------------------------------------------------------------------- 

230 

231 

232@router.post("/{fact_id}/reject", status_code=status.HTTP_200_OK) 

233def reject_fact( 

234 fact_id: str, 

235 identity: Annotated[Identity, Depends(resolve_identity)], 

236 reason: str = Query("", description="Reason for rejection"), 

237) -> dict[str, Any]: 

238 """Permanently reject a quarantined fact. 

239 

240 Sets confidence = 0.0 and quarantine_status = 'rejected'. 

241 Node admins may reject quarantined facts as last-resort moderation 

242 authority. Other callers require quarantine:moderator or admin role in the 

243 fact's quarantine garden. 

244 """ 

245 _require_write(identity) 

246 

247 _fact_row, garden = _get_quarantined_fact(fact_id, identity) 

248 now = datetime.now(UTC).isoformat() 

249 

250 with db() as conn: 

251 set_fact_validity_override( 

252 conn, 

253 fact_id=fact_id, 

254 confidence=0.0, 

255 reason=reason or "rejected via admin API", 

256 updated_by=identity.entity_uri, 

257 ) 

258 set_fact_quarantine_status( 

259 conn, 

260 fact_id=fact_id, 

261 quarantine_garden_id=garden["id"], 

262 quarantine_status="rejected", 

263 quarantine_reason=reason or "rejected via admin API", 

264 quarantine_acted_by=identity.entity_uri, 

265 quarantine_acted_at=now, 

266 ) 

267 # Append-only retraction log (§24.2.1 c.3) 

268 conn.execute( 

269 "INSERT INTO fact_retractions" 

270 " (id, fact_id, retracted_at, retracted_by) VALUES (?,?,?,?)", 

271 (str(uuid.uuid4()), fact_id, now, identity.entity_uri), 

272 ) 

273 _write_quarantine_audit(conn, fact_id, "quarantine_reject", identity, now) 

274 

275 return { 

276 "fact_id": fact_id, 

277 "action": "rejected", 

278 "acted_by": identity.entity_uri, 

279 "acted_at": now, 

280 } 

281 

282 

283# --------------------------------------------------------------------------- 

284# Helpers 

285# --------------------------------------------------------------------------- 

286 

287 

288def _get_quarantined_fact( 

289 fact_id: str, identity: Identity 

290) -> tuple[dict[str, Any], dict[str, Any]]: 

291 """Return (fact_row, garden_row) for a pending quarantined fact. 

292 

293 Raises 404 or 409 as appropriate. Node-admin bypass is intentional because 

294 node admins are the system's last-resort moderation authority. Garden-scoped 

295 moderators must hold quarantine:moderator or admin role in the fact's 

296 quarantine garden. 

297 """ 

298 with db() as conn: 

299 row = conn.execute( 

300 """SELECT f.*, 

301 COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) 

302 AS projected_quarantine_garden_id, 

303 COALESCE(fqs.quarantine_status, f.quarantine_status) 

304 AS projected_quarantine_status, 

305 COALESCE(fqs.quarantine_reason, f.quarantine_reason) 

306 AS projected_quarantine_reason 

307 FROM facts f 

308 LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id 

309 WHERE f.id = ? 

310 AND COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) IS NOT NULL""", 

311 (fact_id,), 

312 ).fetchone() 

313 

314 if row is None: 

315 raise HTTPException( 

316 status_code=status.HTTP_404_NOT_FOUND, detail="quarantined fact not found" 

317 ) 

318 

319 if row["projected_quarantine_status"] != QUARANTINE_PENDING: 

320 raise HTTPException( 

321 status_code=status.HTTP_409_CONFLICT, 

322 detail="fact_not_quarantine_pending", 

323 ) 

324 

325 garden = get_garden_by_slug_or_id(row["projected_quarantine_garden_id"]) 

326 if garden is None: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 raise HTTPException( 

328 status_code=status.HTTP_404_NOT_FOUND, detail="quarantine garden not found" 

329 ) 

330 

331 if not identity.can_write(): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 require_quarantine_moderator_or_admin(garden, identity) 

333 

334 return dict(row), garden 

335 

336 

337def _write_quarantine_audit( 

338 conn: Any, fact_id: str, event_type: str, identity: Identity, now: str 

339) -> None: 

340 audit_id = str(uuid.uuid4()) 

341 conn.execute( 

342 "INSERT INTO fact_audit_log" 

343 " (id, fact_id, event_type, entity_uri, oidc_sub, source," 

344 " attested_key_id, ts)" 

345 " VALUES (?,?,?,?,?,?,?,?)", 

346 ( 

347 audit_id, 

348 fact_id, 

349 event_type, 

350 identity.entity_uri, 

351 identity.oidc_sub, 

352 identity.entity_uri, 

353 None, 

354 now, 

355 ), 

356 )