Coverage for node / src / stigmem_node / routes / identity.py: 80%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Org-identity API routes — spec §19.1–§19.3, §19.5. 

2 

3Routes: 

4 PUT /v1/federation/manifest — publish/update org manifest 

5 GET /v1/federation/manifest/{entity_uri_encoded} — resolve manifest 

6 POST /v1/federation/capability-tokens — issue capability token 

7 POST /v1/federation/capability-tokens/verify — verify a capability token (CLI + peers) 

8 POST /v1/federation/capability-tokens/{token_id}/revoke — revoke token 

9 POST /v1/gardens/{id}/promote — quarantine moderation (garden-scoped) 

10 POST /v1/gardens/{id}/reject — quarantine moderation (garden-scoped) 

11 

12Security requirements enforced here: 

13 C1: token subject MUST appear in issuer's manifest entities list. 

14 H1: expired manifest → reject token issuance (handled via get_peer_manifest). 

15 H2: TL unavailable in strict mode → HTTP 503 (not silent fallback). 

16 Rate-limit manifest PUT: ≤ 10 per entity_uri per hour. 

17 Nonce: 64-char lowercase hex (secrets.token_hex(32)). 

18 Cleanup: expired tokens pruned opportunistically on issuance. 

19""" 

20 

21from __future__ import annotations 

22 

23import json 

24import logging 

25import secrets 

26import time 

27import uuid 

28from datetime import UTC, datetime 

29from typing import Annotated, Any 

30from urllib.parse import unquote 

31 

32from fastapi import APIRouter, Depends, HTTPException, status 

33 

34from ..auth import Identity, resolve_identity 

35from ..db import db 

36from ..identity.capability import ( 

37 CapabilityTokenError, 

38 load_node_private_key, 

39 sign_revocation_event, 

40 sign_token, 

41 verify_token, 

42) 

43from ..identity.manifest import ( 

44 ManifestError, 

45 manifest_from_dict, 

46 manifest_to_dict, 

47 verify_manifest, 

48) 

49from ..identity.transparency_log import TransparencyLogUnavailable, make_transparency_log 

50from ..identity.trust_store import ( 

51 cleanup_expired_tokens, 

52 get_peer_manifest, 

53 store_peer_manifest, 

54) 

55from ..settings import settings 

56 

57router = APIRouter(tags=["identity"]) 

58logger = logging.getLogger("stigmem.routes.identity") 

59 

60# --------------------------------------------------------------------------- 

61# Manifest-PUT rate limiter: { entity_uri: [epoch_s, ...] } (in-process) 

62# --------------------------------------------------------------------------- 

63 

64_MANIFEST_SUBMIT_WINDOW_S = 3600 

65_MANIFEST_SUBMIT_LIMIT = 10 

66_manifest_submit_log: dict[str, list[float]] = {} 

67 

68 

69def _check_manifest_rate_limit(entity_uri: str) -> None: 

70 now = time.monotonic() 

71 window_start = now - _MANIFEST_SUBMIT_WINDOW_S 

72 timestamps = [t for t in _manifest_submit_log.get(entity_uri, []) if t > window_start] 

73 if len(timestamps) >= _MANIFEST_SUBMIT_LIMIT: 

74 raise HTTPException( 

75 status_code=status.HTTP_429_TOO_MANY_REQUESTS, 

76 detail=f"manifest PUT rate limit: max {_MANIFEST_SUBMIT_LIMIT} per hour per entity_uri", 

77 ) 

78 timestamps.append(now) 

79 _manifest_submit_log[entity_uri] = timestamps 

80 

81 

82# --------------------------------------------------------------------------- 

83# PUT /v1/federation/manifest 

84# --------------------------------------------------------------------------- 

85 

86 

87@router.put("/v1/federation/manifest", status_code=status.HTTP_200_OK) 

88async def put_manifest( 

89 body: dict[str, Any], 

90 identity: Annotated[Identity, Depends(resolve_identity)], 

91) -> dict[str, Any]: 

92 """Publish or update an org manifest. 

93 

94 Rate-limited to 10 submissions per entity_uri per hour. 

95 rotation_events arrays with > 100 entries are rejected. 

96 TL submission is attempted; in strict mode TL failure → 503. 

97 """ 

98 if not identity.can_write(): 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 raise HTTPException(status_code=403, detail="write permission required") 

100 

101 entity_uri = body.get("entity_uri", "") 

102 if not entity_uri: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 raise HTTPException(status_code=422, detail="entity_uri is required") 

104 

105 _check_manifest_rate_limit(entity_uri) 

106 

107 rotation_events = body.get("rotation_events", []) 

108 if len(rotation_events) > 100: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 raise HTTPException( 

110 status_code=422, 

111 detail="rotation_events must not exceed 100 entries", 

112 ) 

113 

114 try: 

115 manifest = manifest_from_dict(body) 

116 except (KeyError, ValueError) as exc: 

117 raise HTTPException(status_code=422, detail=f"invalid manifest: {exc}") from exc 

118 

119 try: 

120 verify_manifest(manifest, trust_mode=settings.trust_mode) 

121 except ManifestError as exc: 

122 raise HTTPException(status_code=422, detail=str(exc)) from exc 

123 

124 # Attempt transparency-log submission 

125 tl = make_transparency_log() 

126 log_entry = None 

127 tl_error: str | None = None 

128 

129 try: 

130 log_entry = tl.submit(manifest_to_dict(manifest)) 

131 except TransparencyLogUnavailable as exc: 

132 tl_error = str(exc) 

133 if settings.trust_mode == "strict": 

134 raise HTTPException( 

135 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

136 detail=f"transparency log unavailable (strict mode rejects): {exc}", 

137 ) from exc 

138 logger.warning("TL unavailable during manifest PUT (warn mode): %s", exc) 

139 

140 try: 

141 store_peer_manifest(entity_uri, manifest, log_entry, trust_mode=settings.trust_mode) 

142 except ManifestError as exc: 

143 raise HTTPException(status_code=409, detail=str(exc)) from exc 

144 

145 result: dict[str, Any] = { 

146 "entity_uri": entity_uri, 

147 "key_id": manifest.key_id, 

148 "issued_at": manifest.issued_at, 

149 "expires_at": manifest.expires_at, 

150 } 

151 if log_entry is not None: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 result["log_entry"] = { 

153 "log_id": log_entry.log_id, 

154 "log_index": log_entry.log_index, 

155 "leaf_hash": log_entry.leaf_hash, 

156 "integrated_time": log_entry.integrated_time, 

157 } 

158 if tl_error is not None: 158 ↛ 160line 158 didn't jump to line 160 because the condition on line 158 was always true

159 result["tl_warning"] = tl_error 

160 return result 

161 

162 

163# --------------------------------------------------------------------------- 

164# GET /v1/federation/manifest/{entity_uri_encoded} 

165# --------------------------------------------------------------------------- 

166 

167 

168@router.get("/v1/federation/manifest/{entity_uri_encoded:path}") 

169def get_manifest( 

170 entity_uri_encoded: str, 

171 identity: Annotated[Identity, Depends(resolve_identity)], 

172) -> dict[str, Any]: 

173 """Resolve a peer manifest by entity_uri (URL-encoded or raw path).""" 

174 if not identity.can_read(): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 raise HTTPException(status_code=403, detail="read permission required") 

176 

177 entity_uri = unquote(entity_uri_encoded) 

178 manifest = get_peer_manifest( 

179 entity_uri, refresh_if_expired=True, trust_mode=settings.trust_mode 

180 ) 

181 if manifest is None: 

182 raise HTTPException(status_code=404, detail="manifest not found or expired") 

183 

184 with db() as conn: 

185 row = conn.execute( 

186 "SELECT log_entry_json FROM federation_manifests WHERE entity_uri = ?", 

187 (entity_uri,), 

188 ).fetchone() 

189 

190 result = manifest_to_dict(manifest) 

191 if row and row["log_entry_json"]: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 result["log_entry"] = json.loads(row["log_entry_json"]) 

193 return result 

194 

195 

196# --------------------------------------------------------------------------- 

197# POST /v1/federation/capability-tokens 

198# --------------------------------------------------------------------------- 

199 

200 

201@router.post( 

202 "/v1/federation/capability-tokens", 

203 status_code=status.HTTP_201_CREATED, 

204) 

205def issue_capability_token( 

206 body: dict[str, Any], 

207 identity: Annotated[Identity, Depends(resolve_identity)], 

208) -> dict[str, Any]: 

209 """Issue a capability token. 

210 

211 C1: token subject must appear in the issuer's manifest entities list. 

212 H1: issuer manifest must not be expired. 

213 Nonce: 64-char lowercase hex enforced by DB CHECK constraint. 

214 """ 

215 if not identity.can_write(): 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise HTTPException(status_code=403, detail="write permission required") 

217 

218 issuer = body.get("issuer", "") 

219 subject = body.get("subject", "") 

220 verb = body.get("verb", "") 

221 obj = body.get("object", "") 

222 ttl_seconds: int = int(body.get("ttl_seconds", 3600)) 

223 

224 if not all([issuer, subject, verb, obj]): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise HTTPException( 

226 status_code=422, detail="issuer, subject, verb, and object are required" 

227 ) 

228 

229 # M-SEC-1: enforce 90-day maximum TTL (spec §19.3.2) 

230 _MAX_TOKEN_TTL_SECONDS = 90 * 86400 

231 if ttl_seconds > _MAX_TOKEN_TTL_SECONDS: 

232 raise HTTPException( 

233 status_code=422, 

234 detail=f"ttl_seconds exceeds 90-day maximum ({_MAX_TOKEN_TTL_SECONDS}s)", 

235 ) 

236 

237 # BOLA guard (H-SEC-1): prevent a caller from forging tokens under a different org's identity 

238 if issuer != identity.entity_uri: 

239 raise HTTPException( 

240 status_code=status.HTTP_403_FORBIDDEN, 

241 detail="issuer must match the authenticated caller entity_uri", 

242 ) 

243 

244 # H1: resolve issuer manifest (checks expiry, refreshes if needed) 

245 issuer_manifest = get_peer_manifest( 

246 issuer, refresh_if_expired=True, trust_mode=settings.trust_mode 

247 ) 

248 if issuer_manifest is None: 

249 raise HTTPException( 

250 status_code=422, 

251 detail=f"issuer manifest not found or expired for {issuer!r} " 

252 "(H1: cannot issue tokens from expired manifests)", 

253 ) 

254 

255 # C1: subject must be in issuer's entities list 

256 if subject not in issuer_manifest.entities: 

257 raise HTTPException( 

258 status_code=403, 

259 detail=f"subject {subject!r} is not in issuer {issuer!r} entities list " 

260 "(C1: external-entity delegation not permitted)", 

261 ) 

262 

263 now = datetime.now(UTC) 

264 issued_at = now.isoformat() 

265 expiry = datetime.fromtimestamp(now.timestamp() + ttl_seconds, tz=UTC).isoformat() 

266 token_id = str(uuid.uuid4()) 

267 nonce = secrets.token_hex(32) # 64 lowercase hex chars — satisfies DB CHECK 

268 

269 token_body: dict[str, Any] = { 

270 "token_id": token_id, 

271 "token_version": 1, # nosec B105 — integer version field, not a password 

272 "issuer": issuer, 

273 "subject": subject, 

274 "verb": verb, 

275 "object": obj, 

276 "issued_at": issued_at, 

277 "expiry": expiry, 

278 "nonce": nonce, 

279 } 

280 

281 # Sign token body if node_private_key is configured (C-SEC-1 / spec §19.3.2) 

282 if load_node_private_key() is not None: 

283 token_body["signature"] = sign_token(token_body) 

284 else: 

285 logger.warning( 

286 "STIGMEM_NODE_PRIVATE_KEY not set; issuing unsigned capability token " 

287 "(set the env var to enable spec-compliant signing)" 

288 ) 

289 

290 import canonicaljson 

291 

292 token_json = canonicaljson.encode_canonical_json(token_body).decode() 

293 

294 created_at = now.isoformat() 

295 with db() as conn: 

296 try: 

297 conn.execute( 

298 """INSERT INTO capability_tokens 

299 (id, token_json, issuer, subject, verb, object, 

300 issued_at, expiry, nonce, created_at) 

301 VALUES (?,?,?,?,?,?,?,?,?,?)""", 

302 ( 

303 token_id, 

304 token_json, 

305 issuer, 

306 subject, 

307 verb, 

308 obj, 

309 issued_at, 

310 expiry, 

311 nonce, 

312 created_at, 

313 ), 

314 ) 

315 except Exception as exc: 

316 if "UNIQUE constraint" in str(exc): 

317 raise HTTPException(status_code=409, detail="nonce collision; retry") from exc 

318 raise 

319 

320 # M-SEC-4: audit log entry for token issuance (spec §19.3.2) 

321 conn.execute( 

322 """INSERT INTO fact_audit_log 

323 (id, fact_id, event_type, entity_uri, oidc_sub, source, attested_key_id, detail, ts) 

324 VALUES (?,?,?,?,?,?,?,?,?)""", 

325 ( 

326 str(uuid.uuid4()), 

327 token_id, # token_id as surrogate fact_id 

328 "capability_issued", 

329 issuer, 

330 None, 

331 "system:capability", 

332 None, 

333 json.dumps( 

334 { 

335 "issuer": issuer, 

336 "subject": subject, 

337 "verb": verb, 

338 "object": obj, 

339 "expiry": expiry, 

340 } 

341 ), 

342 created_at, 

343 ), 

344 ) 

345 

346 # Opportunistic cleanup of stale tokens 

347 try: 

348 cleanup_expired_tokens() 

349 except Exception: 

350 logger.exception("Best-effort expired capability token cleanup failed") 

351 

352 return { 

353 "token_id": token_id, 

354 "issuer": issuer, 

355 "subject": subject, 

356 "verb": verb, 

357 "object": obj, 

358 "issued_at": issued_at, 

359 "expiry": expiry, 

360 "nonce": nonce, 

361 "token_json": token_json, 

362 } 

363 

364 

365# --------------------------------------------------------------------------- 

366# POST /v1/federation/capability-tokens/verify 

367# --------------------------------------------------------------------------- 

368# NOTE: this static path MUST be registered before /{token_id}/revoke so that 

369# FastAPI does not swallow "verify" as a {token_id} capture. 

370 

371 

372@router.post( 

373 "/v1/federation/capability-tokens/verify", 

374 status_code=status.HTTP_200_OK, 

375) 

376def verify_capability_token_endpoint( 

377 body: dict[str, Any], 

378 identity: Annotated[Identity, Depends(resolve_identity)], 

379) -> dict[str, Any]: 

380 """Verify a capability token (Spec-06-Capability-Tokens). 

381 

382 Returns {"valid": true} on success, or {"valid": false, "reason": "..."} 

383 when the token fails any verification step (expired, bad sig, revoked, etc.). 

384 HTTP 200 in both cases; 422 only for a missing/malformed request body. 

385 """ 

386 if not identity.can_read(): 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise HTTPException(status_code=403, detail="read permission required") 

388 

389 token_json = body.get("token_json", "") 

390 if not token_json: 

391 raise HTTPException(status_code=422, detail="token_json is required") 

392 

393 try: 

394 verify_token( 

395 token_json, 

396 lambda uri: get_peer_manifest( 

397 uri, refresh_if_expired=True, trust_mode=settings.trust_mode 

398 ), 

399 trust_mode=settings.trust_mode, 

400 ) 

401 return {"valid": True} 

402 except CapabilityTokenError as exc: 

403 err = str(exc) 

404 if "revoked" in err: 404 ↛ 406line 404 didn't jump to line 406 because the condition on line 404 was always true

405 reason = "token_revoked" 

406 elif "expired" in err: 

407 reason = "token_expired" 

408 else: 

409 reason = "token_invalid" 

410 return {"valid": False, "reason": reason} 

411 

412 

413# --------------------------------------------------------------------------- 

414# POST /v1/federation/capability-tokens/{token_id}/revoke 

415# --------------------------------------------------------------------------- 

416 

417 

418@router.post( 

419 "/v1/federation/capability-tokens/{token_id}/revoke", 

420 status_code=status.HTTP_200_OK, 

421) 

422async def revoke_capability_token( 

423 token_id: str, 

424 body: dict[str, Any], 

425 identity: Annotated[Identity, Depends(resolve_identity)], 

426) -> dict[str, Any]: 

427 """Revoke a capability token and submit a revocation notice to the TL.""" 

428 if not identity.can_write(): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 raise HTTPException(status_code=403, detail="write permission required") 

430 

431 now = datetime.now(UTC).isoformat() 

432 reason = body.get("reason", "") 

433 

434 with db() as conn: 

435 row = conn.execute( 

436 "SELECT id, issuer, subject, revoked_at FROM capability_tokens WHERE id = ?", 

437 (token_id,), 

438 ).fetchone() 

439 

440 if row is None: 

441 raise HTTPException(status_code=404, detail="capability token not found") 

442 if row["revoked_at"] is not None: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 raise HTTPException(status_code=409, detail="token already revoked") 

444 

445 # BOLA guard: only the issuer or subject may revoke their own token. 

446 if row["issuer"] != identity.entity_uri and row["subject"] != identity.entity_uri: 

447 raise HTTPException( 

448 status_code=status.HTTP_403_FORBIDDEN, 

449 detail="not authorized to revoke this token: caller is not the issuer or subject", 

450 ) 

451 

452 revoke_event: dict[str, Any] = { 

453 "token_id": token_id, 

454 "revoked_by": identity.entity_uri, 

455 "revoked_at": now, 

456 "reason": reason, 

457 } 

458 

459 # Sign revocation event (spec §19.3.4 — best-effort when key not configured) 

460 if load_node_private_key() is not None: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 revoke_event["signature"] = sign_revocation_event(revoke_event) 

462 

463 # Submit revocation to TL (best-effort; log warning in non-strict mode) 

464 tl = make_transparency_log() 

465 tl_error: str | None = None 

466 tl_entry = None 

467 try: 

468 tl_entry = tl.submit(revoke_event) 

469 except TransparencyLogUnavailable as exc: 

470 tl_error = str(exc) 

471 if settings.trust_mode == "strict": 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 raise HTTPException( 

473 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

474 detail=f"transparency log unavailable during revocation (strict mode): {exc}", 

475 ) from exc 

476 logger.warning("TL unavailable during token revocation (warn mode): %s", exc) 

477 

478 revoke_log_json = json.dumps( 

479 { 

480 **revoke_event, 

481 **( 

482 { 

483 "tl_entry": { 

484 "log_id": tl_entry.log_id, 

485 "log_index": tl_entry.log_index, 

486 "leaf_hash": tl_entry.leaf_hash, 

487 } 

488 } 

489 if tl_entry 

490 else {} 

491 ), 

492 } 

493 ) 

494 

495 with db() as conn: 

496 conn.execute( 

497 "UPDATE capability_tokens SET revoked_at = ?, revoke_log = ? WHERE id = ?", 

498 (now, revoke_log_json, token_id), 

499 ) 

500 # M-SEC-4: audit log entry for token revocation (spec §19.3.4) 

501 conn.execute( 

502 """INSERT INTO fact_audit_log 

503 (id, fact_id, event_type, entity_uri, oidc_sub, source, attested_key_id, detail, ts) 

504 VALUES (?,?,?,?,?,?,?,?,?)""", 

505 ( 

506 str(uuid.uuid4()), 

507 token_id, # token_id as surrogate fact_id 

508 "capability_revoked", 

509 identity.entity_uri, 

510 None, 

511 "system:capability", 

512 None, 

513 json.dumps({"reason": reason}), 

514 now, 

515 ), 

516 ) 

517 

518 result: dict[str, Any] = {"token_id": token_id, "revoked_at": now, "status": "revoked"} 

519 if tl_error: 519 ↛ 521line 519 didn't jump to line 521 because the condition on line 519 was always true

520 result["tl_warning"] = tl_error 

521 return result 

522 

523 

524# Quarantine promote/reject routes live in routes/gardens.py (registered before this 

525# router). Do NOT add duplicate promote/reject handlers here — gardens.py owns those 

526# endpoints and enforces the correct quarantine ACL (quarantine-flag check + mandatory 

527# moderator/admin role, no write-permission bypass).