Coverage for node / src / stigmem_node / routes / tombstones.py: 82%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""RTBF tombstone admin API — spec §23.6.
3Routes:
4 POST /v1/tombstones — issue a tombstone (admin)
5 GET /v1/tombstones/{entity_uri_encoded} — check tombstone status (admin)
6 POST /v1/tombstones/{tombstone_id}/revoke — revoke a tombstone (admin)
8All endpoints require an admin API key.
9"""
11from __future__ import annotations
13import urllib.parse
14import uuid
15from datetime import UTC, datetime
16from typing import Annotated
18from fastapi import APIRouter, Depends, HTTPException, status
20from ..auth import Identity, resolve_identity
21from ..lifecycle.tombstone_signing import get_node_key_id, sign_revocation, sign_tombstone
22from ..lifecycle.tombstones import (
23 create_tombstone,
24 get_tombstone_status,
25)
26from ..lifecycle.tombstones import (
27 revoke_tombstone as _revoke_tombstone,
28)
29from ..models.tombstones import (
30 TombstoneCreateRequest,
31 TombstoneRecord,
32 TombstoneRevocationRecord,
33 TombstoneRevokeRequest,
34 TombstoneStatusResponse,
35)
37router = APIRouter(prefix="/v1/tombstones", tags=["tombstones"])
39_VALID_SCOPES = {"local", "team", "company", "public", "*"}
42def _require_admin(identity: Identity) -> None:
43 if not identity.is_admin():
44 raise HTTPException(
45 status_code=status.HTTP_403_FORBIDDEN,
46 detail="admin API key required",
47 )
50# ---------------------------------------------------------------------------
51# POST /v1/tombstones — issue a tombstone
52# ---------------------------------------------------------------------------
55@router.post("", status_code=status.HTTP_201_CREATED, response_model=TombstoneRecord)
56def issue_tombstone(
57 req: TombstoneCreateRequest,
58 identity: Annotated[Identity, Depends(resolve_identity)],
59) -> TombstoneRecord:
60 _require_admin(identity)
62 if req.scope not in _VALID_SCOPES:
63 raise HTTPException(
64 status_code=status.HTTP_400_BAD_REQUEST,
65 detail="tombstone_invalid_scope",
66 )
68 if ( 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was never true
69 "://" not in req.entity_uri
70 and not req.entity_uri.startswith("urn:")
71 and ":" not in req.entity_uri
72 ):
73 raise HTTPException(
74 status_code=status.HTTP_400_BAD_REQUEST,
75 detail="tombstone_entity_uri_invalid",
76 )
78 key_id = get_node_key_id() or ""
80 tombstone_id = "tomb_" + str(uuid.uuid4())
81 created_at = datetime.now(UTC).isoformat()
82 draft = TombstoneRecord(
83 id=tombstone_id,
84 entity_uri=req.entity_uri,
85 scope=req.scope,
86 reason=req.reason,
87 signed_by=identity.entity_uri,
88 key_id=key_id,
89 signature="",
90 created_at=created_at,
91 legal_hold=req.legal_hold,
92 )
94 signed = sign_tombstone(draft)
96 try:
97 record = create_tombstone(
98 entity_uri=signed.entity_uri,
99 scope=signed.scope,
100 reason=signed.reason,
101 signed_by=signed.signed_by,
102 key_id=signed.key_id,
103 signature=signed.signature,
104 legal_hold=signed.legal_hold,
105 tombstone_id=signed.id,
106 created_at=signed.created_at,
107 )
108 except Exception as exc:
109 if "already exists" in str(exc) or "UNIQUE" in str(exc):
110 raise HTTPException(
111 status_code=status.HTTP_409_CONFLICT,
112 detail="tombstone_already_exists",
113 ) from exc
114 raise
116 # Enqueue outbound federation rebroadcast (§23.4.1) — best-effort background
117 _enqueue_tombstone_rebroadcast(record)
119 return record
122# ---------------------------------------------------------------------------
123# GET /v1/tombstones/{entity_uri_encoded} — check tombstone status
124# ---------------------------------------------------------------------------
127@router.get("/{entity_uri_encoded}", response_model=TombstoneStatusResponse)
128def check_tombstone_status(
129 entity_uri_encoded: str,
130 identity: Annotated[Identity, Depends(resolve_identity)],
131) -> TombstoneStatusResponse:
132 _require_admin(identity)
133 entity_uri = urllib.parse.unquote(entity_uri_encoded)
134 return get_tombstone_status(entity_uri)
137# ---------------------------------------------------------------------------
138# POST /v1/tombstones/{tombstone_id}/revoke — revoke a tombstone
139# ---------------------------------------------------------------------------
142@router.post("/{tombstone_id}/revoke", response_model=TombstoneRevocationRecord)
143def revoke_tombstone_endpoint(
144 tombstone_id: str,
145 req: TombstoneRevokeRequest,
146 identity: Annotated[Identity, Depends(resolve_identity)],
147) -> TombstoneRevocationRecord:
148 _require_admin(identity)
150 key_id = get_node_key_id()
151 if key_id is None: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 raise HTTPException(
153 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
154 detail="node signing key not configured",
155 )
157 try:
158 record = _revoke_tombstone(
159 tombstone_id=tombstone_id,
160 reason=req.reason,
161 signed_by=identity.entity_uri,
162 key_id=key_id,
163 signature="pending",
164 )
165 except KeyError as exc:
166 raise HTTPException(
167 status_code=status.HTTP_404_NOT_FOUND, detail="tombstone_not_found"
168 ) from exc
169 except ValueError as exc:
170 if "already_revoked" in str(exc): 170 ↛ 174line 170 didn't jump to line 174 because the condition on line 170 was always true
171 raise HTTPException(
172 status_code=status.HTTP_409_CONFLICT, detail="tombstone_already_revoked"
173 ) from exc
174 raise
176 signed_record = sign_revocation(record)
177 # Update the stored signature
178 from ..db import db
180 with db() as conn:
181 conn.execute(
182 "UPDATE tombstone_revocations SET signature = ?, key_id = ? WHERE id = ?",
183 (signed_record.signature, signed_record.key_id, signed_record.id),
184 )
185 return signed_record
188# ---------------------------------------------------------------------------
189# Background federation rebroadcast
190# ---------------------------------------------------------------------------
193def _enqueue_tombstone_rebroadcast(record: TombstoneRecord) -> None:
194 """Best-effort outbound push of tombstone to all active federation peers (§23.4.1)."""
195 import threading
197 t = threading.Thread(target=_push_tombstone_to_peers, args=(record,), daemon=True)
198 t.start()
201def _push_tombstone_to_peers(record: TombstoneRecord) -> None:
202 import logging
204 import httpx
206 from ..db import db
207 from ..federation.peer_token import create_peer_token
209 log = logging.getLogger("stigmem.tombstones.federation")
210 try:
211 with db() as conn:
212 peers = conn.execute(
213 "SELECT node_id, node_url, allowed_scopes FROM peers WHERE status = 'active'"
214 ).fetchall()
215 except Exception:
216 log.exception("Failed to fetch peers for tombstone rebroadcast")
217 return
219 payload = record.model_dump()
220 for peer in peers:
221 try:
222 import json as _json
224 allowed_scopes = _json.loads(peer["allowed_scopes"])
225 token = create_peer_token(peer["node_id"], allowed_scopes)
226 resp = httpx.post(
227 f"{peer['node_url'].rstrip('/')}/v1/federation/tombstones/ingest",
228 json=payload,
229 headers={"Authorization": f"Bearer {token}"},
230 timeout=10.0,
231 )
232 if resp.status_code not in (200, 201, 409):
233 log.warning(
234 "Peer %s returned %s on tombstone push", peer["node_url"], resp.status_code
235 )
236 except Exception:
237 log.warning("Failed to push tombstone to peer %s", peer["node_url"], exc_info=True)