Coverage for node / src / stigmem_node / routes / quarantine.py: 80%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Quarantine admin API — spec §19.5.
3GET /v1/quarantine — list quarantined facts (all or by garden)
4POST /v1/quarantine/{fact_id}/admit — shorthand: promote fact to main fabric
5POST /v1/quarantine/{fact_id}/reject — shorthand: reject a quarantined fact
7These endpoints are convenience wrappers over the garden-level promote/reject
8endpoints (POST /v1/gardens/:id/promote|reject). They operate node-globally:
9the caller addresses a fact by ID without needing to know its quarantine garden.
10Requires node admin (write) permission.
11"""
13from __future__ import annotations
15import uuid
16from datetime import UTC, datetime
17from typing import Annotated, Any
19from fastapi import APIRouter, Depends, HTTPException, Query, status
21from ..audit_event import INSTRUCTION_PROMOTED, emit_instruction_event_if_applicable
22from ..auth import Identity, resolve_identity
23from ..db import db
24from ..garden_acl import get_garden_by_slug_or_id, require_quarantine_moderator_or_admin
25from ..lifecycle.immutability import (
26 set_fact_garden_membership,
27 set_fact_quarantine_status,
28 set_fact_validity_override,
29)
30from ..models.constants import QUARANTINE_PENDING
31from ..models.gardens import (
32 QuarantineListResponse,
33 QuarantineRecord,
34)
36router = APIRouter(prefix="/v1/quarantine", tags=["quarantine"])
39def _require_write(identity: Identity) -> None:
40 if not identity.can_write(): 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 raise HTTPException(
42 status_code=status.HTTP_403_FORBIDDEN, detail="write permission required"
43 )
46# ---------------------------------------------------------------------------
47# List quarantined facts
48# ---------------------------------------------------------------------------
51@router.get("", response_model=QuarantineListResponse)
52def list_quarantined_facts(
53 identity: Annotated[Identity, Depends(resolve_identity)],
54 garden_id: str | None = Query(None, description="Filter by quarantine garden UUID or slug"),
55 quarantine_status: str | None = Query(
56 None, description="Filter by status: pending, promoted, rejected"
57 ),
58 limit: int = Query(100, ge=1, le=1000),
59 offset: int = Query(0, ge=0),
60) -> QuarantineListResponse:
61 """List facts in the quarantine system (Spec-08-Quarantine-Garden).
63 Node admins see all quarantined facts across all gardens.
64 Other callers see facts only in quarantine gardens where they hold a member role.
65 """
66 if not identity.can_read(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 raise HTTPException(
68 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
69 )
71 projection_joins = (
72 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id"
73 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id"
74 )
75 quarantine_garden_expr = "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id)"
76 quarantine_status_expr = "COALESCE(fqs.quarantine_status, f.quarantine_status)"
77 filters: list[str] = [f"{quarantine_garden_expr} IS NOT NULL"]
78 params: list[Any] = []
80 if quarantine_status: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 filters.append(f"{quarantine_status_expr} = ?")
82 params.append(quarantine_status)
83 else:
84 filters.append(f"{quarantine_status_expr} IS NOT NULL")
86 if garden_id:
87 # Resolve slug to UUID
88 garden = get_garden_by_slug_or_id(garden_id, tenant_id=identity.tenant_id)
89 if garden is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found")
91 filters.append(f"{quarantine_garden_expr} = ?")
92 params.append(garden["id"])
93 elif not identity.can_write(): 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was never true
94 # Non-admins: only see facts in gardens they're members of
95 filters.append(
96 f"{quarantine_garden_expr} IN (" # nosec B608
97 " SELECT gm.garden_id FROM garden_members gm"
98 " WHERE gm.entity_uri = ?"
99 ")"
100 )
101 params.append(identity.entity_uri)
103 where_clause = " AND ".join(filters)
105 with db() as conn:
106 count_row = conn.execute(
107 f"SELECT COUNT(*) FROM facts f {projection_joins} WHERE {where_clause}", # nosec B608
108 params,
109 ).fetchone()
110 total: int = count_row[0] if count_row else 0
112 rows = conn.execute(
113 f"""SELECT f.id, f.entity, f.relation, f.source,
114 {quarantine_status_expr} AS quarantine_status,
115 {quarantine_garden_expr} AS quarantine_garden_id,
116 COALESCE(fqs.quarantine_reason, f.quarantine_reason) AS quarantine_reason,
117 COALESCE(fqs.quarantine_acted_by, f.quarantine_acted_by)
118 AS quarantine_acted_by,
119 COALESCE(fqs.quarantine_acted_at, f.quarantine_acted_at)
120 AS quarantine_acted_at,
121 f.source_trust, f.received_from, f.timestamp
122 FROM facts f {projection_joins}
123 WHERE {where_clause}
124 ORDER BY f.timestamp DESC
125 LIMIT ? OFFSET ?""", # nosec B608
126 [*params, limit, offset],
127 ).fetchall()
129 items = [
130 QuarantineRecord(
131 fact_id=r["id"],
132 entity=r["entity"],
133 relation=r["relation"],
134 source=r["source"],
135 quarantine_status=r["quarantine_status"] or "",
136 quarantine_garden_id=r["quarantine_garden_id"],
137 quarantine_reason=r["quarantine_reason"],
138 quarantine_acted_by=r["quarantine_acted_by"],
139 quarantine_acted_at=r["quarantine_acted_at"],
140 source_trust=float(r["source_trust"]) if r["source_trust"] is not None else None,
141 received_from=r["received_from"],
142 timestamp=r["timestamp"],
143 )
144 for r in rows
145 ]
147 return QuarantineListResponse(items=items, total=total)
150# ---------------------------------------------------------------------------
151# Admit (promote) a fact from quarantine to the main fabric
152# ---------------------------------------------------------------------------
155@router.post("/{fact_id}/admit", status_code=status.HTTP_200_OK)
156def admit_fact(
157 fact_id: str,
158 identity: Annotated[Identity, Depends(resolve_identity)],
159 target_garden_id: str | None = Query(None, description="Target garden UUID or slug"),
160 reason: str = Query("", description="Reason for admission"),
161) -> dict[str, Any]:
162 """Promote a quarantined fact to the main fabric (or a specific target garden).
164 Node admins may admit quarantined facts as last-resort moderation
165 authority. Other callers require quarantine:moderator or admin role in the
166 fact's quarantine garden.
167 """
168 _require_write(identity)
170 fact_row, garden = _get_quarantined_fact(fact_id, identity)
171 now = datetime.now(UTC).isoformat()
173 # Resolve target garden
174 target_db_id: str | None = None
175 if target_garden_id: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 tg = get_garden_by_slug_or_id(target_garden_id, tenant_id=identity.tenant_id)
177 if tg is None:
178 raise HTTPException(
179 status_code=status.HTTP_404_NOT_FOUND, detail="target garden not found"
180 )
181 target_db_id = tg["id"]
183 with db() as conn:
184 set_fact_garden_membership(
185 conn,
186 fact_id=fact_id,
187 garden_id=target_db_id,
188 updated_by=identity.entity_uri,
189 )
190 set_fact_quarantine_status(
191 conn,
192 fact_id=fact_id,
193 quarantine_garden_id=garden["id"],
194 quarantine_status="promoted",
195 quarantine_reason=reason or "admitted via admin API",
196 quarantine_acted_by=identity.entity_uri,
197 quarantine_acted_at=now,
198 )
199 _write_quarantine_audit(conn, fact_id, "quarantine_promote", identity, now)
200 emit_instruction_event_if_applicable(
201 INSTRUCTION_PROMOTED,
202 fact_id=fact_id,
203 fact_entity=fact_row["entity"],
204 fact_relation=fact_row["relation"],
205 fact_interpret_as=fact_row["interpret_as"],
206 actor_uri=identity.entity_uri,
207 tenant_id=identity.tenant_id,
208 oidc_sub=identity.oidc_sub,
209 source=identity.entity_uri,
210 detail={
211 "reason": reason or "admitted via admin API",
212 "quarantine_garden_id": garden["id"],
213 "target_garden_id": target_db_id,
214 },
215 conn=conn,
216 )
218 return {
219 "fact_id": fact_id,
220 "action": "admitted",
221 "target_garden_id": target_db_id,
222 "acted_by": identity.entity_uri,
223 "acted_at": now,
224 }
227# ---------------------------------------------------------------------------
228# Reject a quarantined fact
229# ---------------------------------------------------------------------------
232@router.post("/{fact_id}/reject", status_code=status.HTTP_200_OK)
233def reject_fact(
234 fact_id: str,
235 identity: Annotated[Identity, Depends(resolve_identity)],
236 reason: str = Query("", description="Reason for rejection"),
237) -> dict[str, Any]:
238 """Permanently reject a quarantined fact.
240 Sets confidence = 0.0 and quarantine_status = 'rejected'.
241 Node admins may reject quarantined facts as last-resort moderation
242 authority. Other callers require quarantine:moderator or admin role in the
243 fact's quarantine garden.
244 """
245 _require_write(identity)
247 _fact_row, garden = _get_quarantined_fact(fact_id, identity)
248 now = datetime.now(UTC).isoformat()
250 with db() as conn:
251 set_fact_validity_override(
252 conn,
253 fact_id=fact_id,
254 confidence=0.0,
255 reason=reason or "rejected via admin API",
256 updated_by=identity.entity_uri,
257 )
258 set_fact_quarantine_status(
259 conn,
260 fact_id=fact_id,
261 quarantine_garden_id=garden["id"],
262 quarantine_status="rejected",
263 quarantine_reason=reason or "rejected via admin API",
264 quarantine_acted_by=identity.entity_uri,
265 quarantine_acted_at=now,
266 )
267 # Append-only retraction log (§24.2.1 c.3)
268 conn.execute(
269 "INSERT INTO fact_retractions"
270 " (id, fact_id, retracted_at, retracted_by) VALUES (?,?,?,?)",
271 (str(uuid.uuid4()), fact_id, now, identity.entity_uri),
272 )
273 _write_quarantine_audit(conn, fact_id, "quarantine_reject", identity, now)
275 return {
276 "fact_id": fact_id,
277 "action": "rejected",
278 "acted_by": identity.entity_uri,
279 "acted_at": now,
280 }
283# ---------------------------------------------------------------------------
284# Helpers
285# ---------------------------------------------------------------------------
288def _get_quarantined_fact(
289 fact_id: str, identity: Identity
290) -> tuple[dict[str, Any], dict[str, Any]]:
291 """Return (fact_row, garden_row) for a pending quarantined fact.
293 Raises 404 or 409 as appropriate. Node-admin bypass is intentional because
294 node admins are the system's last-resort moderation authority. Garden-scoped
295 moderators must hold quarantine:moderator or admin role in the fact's
296 quarantine garden.
297 """
298 with db() as conn:
299 row = conn.execute(
300 """SELECT f.*,
301 COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id)
302 AS projected_quarantine_garden_id,
303 COALESCE(fqs.quarantine_status, f.quarantine_status)
304 AS projected_quarantine_status,
305 COALESCE(fqs.quarantine_reason, f.quarantine_reason)
306 AS projected_quarantine_reason
307 FROM facts f
308 LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id
309 WHERE f.id = ?
310 AND COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) IS NOT NULL""",
311 (fact_id,),
312 ).fetchone()
314 if row is None:
315 raise HTTPException(
316 status_code=status.HTTP_404_NOT_FOUND, detail="quarantined fact not found"
317 )
319 if row["projected_quarantine_status"] != QUARANTINE_PENDING:
320 raise HTTPException(
321 status_code=status.HTTP_409_CONFLICT,
322 detail="fact_not_quarantine_pending",
323 )
325 garden = get_garden_by_slug_or_id(row["projected_quarantine_garden_id"])
326 if garden is None: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 raise HTTPException(
328 status_code=status.HTTP_404_NOT_FOUND, detail="quarantine garden not found"
329 )
331 if not identity.can_write(): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 require_quarantine_moderator_or_admin(garden, identity)
334 return dict(row), garden
337def _write_quarantine_audit(
338 conn: Any, fact_id: str, event_type: str, identity: Identity, now: str
339) -> None:
340 audit_id = str(uuid.uuid4())
341 conn.execute(
342 "INSERT INTO fact_audit_log"
343 " (id, fact_id, event_type, entity_uri, oidc_sub, source,"
344 " attested_key_id, ts)"
345 " VALUES (?,?,?,?,?,?,?,?)",
346 (
347 audit_id,
348 fact_id,
349 event_type,
350 identity.entity_uri,
351 identity.oidc_sub,
352 identity.entity_uri,
353 None,
354 now,
355 ),
356 )