Coverage for node / src / stigmem_node / routes / tombstones.py: 82%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""RTBF tombstone admin API — spec §23.6. 

2 

3Routes: 

4 POST /v1/tombstones — issue a tombstone (admin) 

5 GET /v1/tombstones/{entity_uri_encoded} — check tombstone status (admin) 

6 POST /v1/tombstones/{tombstone_id}/revoke — revoke a tombstone (admin) 

7 

8All endpoints require an admin API key. 

9""" 

10 

11from __future__ import annotations 

12 

13import urllib.parse 

14import uuid 

15from datetime import UTC, datetime 

16from typing import Annotated 

17 

18from fastapi import APIRouter, Depends, HTTPException, status 

19 

20from ..auth import Identity, resolve_identity 

21from ..lifecycle.tombstone_signing import get_node_key_id, sign_revocation, sign_tombstone 

22from ..lifecycle.tombstones import ( 

23 create_tombstone, 

24 get_tombstone_status, 

25) 

26from ..lifecycle.tombstones import ( 

27 revoke_tombstone as _revoke_tombstone, 

28) 

29from ..models.tombstones import ( 

30 TombstoneCreateRequest, 

31 TombstoneRecord, 

32 TombstoneRevocationRecord, 

33 TombstoneRevokeRequest, 

34 TombstoneStatusResponse, 

35) 

36 

37router = APIRouter(prefix="/v1/tombstones", tags=["tombstones"]) 

38 

39_VALID_SCOPES = {"local", "team", "company", "public", "*"} 

40 

41 

42def _require_admin(identity: Identity) -> None: 

43 if not identity.is_admin(): 

44 raise HTTPException( 

45 status_code=status.HTTP_403_FORBIDDEN, 

46 detail="admin API key required", 

47 ) 

48 

49 

50# --------------------------------------------------------------------------- 

51# POST /v1/tombstones — issue a tombstone 

52# --------------------------------------------------------------------------- 

53 

54 

55@router.post("", status_code=status.HTTP_201_CREATED, response_model=TombstoneRecord) 

56def issue_tombstone( 

57 req: TombstoneCreateRequest, 

58 identity: Annotated[Identity, Depends(resolve_identity)], 

59) -> TombstoneRecord: 

60 _require_admin(identity) 

61 

62 if req.scope not in _VALID_SCOPES: 

63 raise HTTPException( 

64 status_code=status.HTTP_400_BAD_REQUEST, 

65 detail="tombstone_invalid_scope", 

66 ) 

67 

68 if ( 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was never true

69 "://" not in req.entity_uri 

70 and not req.entity_uri.startswith("urn:") 

71 and ":" not in req.entity_uri 

72 ): 

73 raise HTTPException( 

74 status_code=status.HTTP_400_BAD_REQUEST, 

75 detail="tombstone_entity_uri_invalid", 

76 ) 

77 

78 key_id = get_node_key_id() or "" 

79 

80 tombstone_id = "tomb_" + str(uuid.uuid4()) 

81 created_at = datetime.now(UTC).isoformat() 

82 draft = TombstoneRecord( 

83 id=tombstone_id, 

84 entity_uri=req.entity_uri, 

85 scope=req.scope, 

86 reason=req.reason, 

87 signed_by=identity.entity_uri, 

88 key_id=key_id, 

89 signature="", 

90 created_at=created_at, 

91 legal_hold=req.legal_hold, 

92 ) 

93 

94 signed = sign_tombstone(draft) 

95 

96 try: 

97 record = create_tombstone( 

98 entity_uri=signed.entity_uri, 

99 scope=signed.scope, 

100 reason=signed.reason, 

101 signed_by=signed.signed_by, 

102 key_id=signed.key_id, 

103 signature=signed.signature, 

104 legal_hold=signed.legal_hold, 

105 tombstone_id=signed.id, 

106 created_at=signed.created_at, 

107 ) 

108 except Exception as exc: 

109 if "already exists" in str(exc) or "UNIQUE" in str(exc): 

110 raise HTTPException( 

111 status_code=status.HTTP_409_CONFLICT, 

112 detail="tombstone_already_exists", 

113 ) from exc 

114 raise 

115 

116 # Enqueue outbound federation rebroadcast (§23.4.1) — best-effort background 

117 _enqueue_tombstone_rebroadcast(record) 

118 

119 return record 

120 

121 

122# --------------------------------------------------------------------------- 

123# GET /v1/tombstones/{entity_uri_encoded} — check tombstone status 

124# --------------------------------------------------------------------------- 

125 

126 

127@router.get("/{entity_uri_encoded}", response_model=TombstoneStatusResponse) 

128def check_tombstone_status( 

129 entity_uri_encoded: str, 

130 identity: Annotated[Identity, Depends(resolve_identity)], 

131) -> TombstoneStatusResponse: 

132 _require_admin(identity) 

133 entity_uri = urllib.parse.unquote(entity_uri_encoded) 

134 return get_tombstone_status(entity_uri) 

135 

136 

137# --------------------------------------------------------------------------- 

138# POST /v1/tombstones/{tombstone_id}/revoke — revoke a tombstone 

139# --------------------------------------------------------------------------- 

140 

141 

142@router.post("/{tombstone_id}/revoke", response_model=TombstoneRevocationRecord) 

143def revoke_tombstone_endpoint( 

144 tombstone_id: str, 

145 req: TombstoneRevokeRequest, 

146 identity: Annotated[Identity, Depends(resolve_identity)], 

147) -> TombstoneRevocationRecord: 

148 _require_admin(identity) 

149 

150 key_id = get_node_key_id() 

151 if key_id is None: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 raise HTTPException( 

153 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

154 detail="node signing key not configured", 

155 ) 

156 

157 try: 

158 record = _revoke_tombstone( 

159 tombstone_id=tombstone_id, 

160 reason=req.reason, 

161 signed_by=identity.entity_uri, 

162 key_id=key_id, 

163 signature="pending", 

164 ) 

165 except KeyError as exc: 

166 raise HTTPException( 

167 status_code=status.HTTP_404_NOT_FOUND, detail="tombstone_not_found" 

168 ) from exc 

169 except ValueError as exc: 

170 if "already_revoked" in str(exc): 170 ↛ 174line 170 didn't jump to line 174 because the condition on line 170 was always true

171 raise HTTPException( 

172 status_code=status.HTTP_409_CONFLICT, detail="tombstone_already_revoked" 

173 ) from exc 

174 raise 

175 

176 signed_record = sign_revocation(record) 

177 # Update the stored signature 

178 from ..db import db 

179 

180 with db() as conn: 

181 conn.execute( 

182 "UPDATE tombstone_revocations SET signature = ?, key_id = ? WHERE id = ?", 

183 (signed_record.signature, signed_record.key_id, signed_record.id), 

184 ) 

185 return signed_record 

186 

187 

188# --------------------------------------------------------------------------- 

189# Background federation rebroadcast 

190# --------------------------------------------------------------------------- 

191 

192 

193def _enqueue_tombstone_rebroadcast(record: TombstoneRecord) -> None: 

194 """Best-effort outbound push of tombstone to all active federation peers (§23.4.1).""" 

195 import threading 

196 

197 t = threading.Thread(target=_push_tombstone_to_peers, args=(record,), daemon=True) 

198 t.start() 

199 

200 

201def _push_tombstone_to_peers(record: TombstoneRecord) -> None: 

202 import logging 

203 

204 import httpx 

205 

206 from ..db import db 

207 from ..federation.peer_token import create_peer_token 

208 

209 log = logging.getLogger("stigmem.tombstones.federation") 

210 try: 

211 with db() as conn: 

212 peers = conn.execute( 

213 "SELECT node_id, node_url, allowed_scopes FROM peers WHERE status = 'active'" 

214 ).fetchall() 

215 except Exception: 

216 log.exception("Failed to fetch peers for tombstone rebroadcast") 

217 return 

218 

219 payload = record.model_dump() 

220 for peer in peers: 

221 try: 

222 import json as _json 

223 

224 allowed_scopes = _json.loads(peer["allowed_scopes"]) 

225 token = create_peer_token(peer["node_id"], allowed_scopes) 

226 resp = httpx.post( 

227 f"{peer['node_url'].rstrip('/')}/v1/federation/tombstones/ingest", 

228 json=payload, 

229 headers={"Authorization": f"Bearer {token}"}, 

230 timeout=10.0, 

231 ) 

232 if resp.status_code not in (200, 201, 409): 

233 log.warning( 

234 "Peer %s returned %s on tombstone push", peer["node_url"], resp.status_code 

235 ) 

236 except Exception: 

237 log.warning("Failed to push tombstone to peer %s", peer["node_url"], exc_info=True)