Coverage for node / src / stigmem_node / routes / _federation_impl.py: 69%

232 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation route implementations extracted from routes/federation.py. 

2 

3These functions are the original route handler bodies; they are imported back 

4into ``routes.federation`` and invoked from thin ``@router``-decorated wrappers. 

5No behavioural changes — code was moved verbatim from federation.py. 

6 

7Tests monkey-patch attributes on the ``routes.federation`` module 

8(``settings``, ``write_audit_log``). To honour those patches, this module 

9looks those names up via ``routes.federation`` lazily inside the function 

10bodies rather than binding them at import time. 

11""" 

12 

13from __future__ import annotations 

14 

15import contextlib 

16import hashlib 

17import json 

18import logging 

19import uuid 

20from datetime import UTC, datetime 

21from typing import Any 

22 

23import httpx 

24from fastapi import BackgroundTasks, HTTPException, Request, status 

25 

26from ..auth import Identity 

27from ..db import db 

28from ..federation.peer_token import verify_declaration_sig 

29from ..federation.tls import check_peer_san 

30from ..identity.capability import CapabilityTokenError, verify_token 

31from ..identity.manifest import ManifestError, manifest_from_dict, verify_manifest 

32from ..identity.transparency_log import LogEntry, TransparencyLogUnavailable, make_transparency_log 

33from ..identity.trust_store import get_peer_manifest, store_peer_manifest 

34from ..models.federation import ( 

35 PeerApprovalResponse, 

36 PeerRegisterRequest, 

37 PeerRegisterResponse, 

38) 

39from ..models.tombstones import ( 

40 TombstoneRecord, 

41 TombstoneRevocationRecord, 

42) 

43from ..net_util import assert_safe_url 

44from ..plugins import Deny, TenantContext, get_registry 

45 

46logger = logging.getLogger("stigmem.federation") 

47 

48 

49def peer_pubkey_fingerprint(pubkey: str) -> str: 

50 """Return the operator-verifiable fingerprint for a pinned peer public key.""" 

51 return f"sha256:{hashlib.sha256(pubkey.encode()).hexdigest()}" 

52 

53 

54def _make_federation_client() -> httpx.AsyncClient: 

55 from . import federation as _fed_mod 

56 

57 if _fed_mod.settings.mtls_enabled: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true

58 from ..federation.tls import build_client_ssl_context 

59 

60 ssl_ctx = build_client_ssl_context( 

61 _fed_mod.settings.tls_cert_path, 

62 _fed_mod.settings.tls_key_path, 

63 _fed_mod.settings.tls_ca_bundle, 

64 ) 

65 return httpx.AsyncClient(verify=ssl_ctx, trust_env=False) 

66 return httpx.AsyncClient(trust_env=False) 

67 

68 

69async def register_peer_impl( 

70 req: PeerRegisterRequest, 

71 background_tasks: BackgroundTasks, 

72 identity: Identity, 

73) -> PeerRegisterResponse: 

74 """Register a peer. Fetches its well-known doc and verifies declaration_sig (§5.6).""" 

75 if not identity.can_federate(): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 raise HTTPException(status_code=403, detail="federate permission required") 

77 decision = get_registry().fire_voting( 

78 "federation_peer_authenticate", 

79 req=req, 

80 identity=identity, 

81 tenant=TenantContext( 

82 tenant_id=identity.tenant_id, 

83 metadata={"tenant_context_source": "hook"}, 

84 ), 

85 ) 

86 if isinstance(decision, Deny): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise HTTPException(status_code=403, detail=decision.reason) 

88 

89 peer_id = str(uuid.uuid4()) 

90 allowed_scopes_json = json.dumps(sorted(req.allowed_scopes)) 

91 

92 with db() as conn: 

93 existing = conn.execute( 

94 "SELECT id, status FROM peers WHERE node_id = ?", (req.node_id,) 

95 ).fetchone() 

96 if existing: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 raise HTTPException( 

98 status_code=409, 

99 detail=f"peer already registered (status={existing['status']})", 

100 ) 

101 conn.execute( 

102 """INSERT INTO peers 

103 (id, node_id, node_url, federation_pubkey, allowed_scopes, 

104 status, established_at, declaration_sig, signed_at) 

105 VALUES (?,?,?,?,?,?,?,?,?)""", 

106 ( 

107 peer_id, 

108 req.node_id, 

109 req.node_url, 

110 req.federation_pubkey, 

111 allowed_scopes_json, 

112 "pending_verification", 

113 None, 

114 req.declaration_sig, 

115 req.signed_at, 

116 ), 

117 ) 

118 

119 # Fetch peer's /.well-known/stigmem to retrieve their published pubkey (§5.6 step 1–3) 

120 fetched_pubkey: str | None = None 

121 try: 

122 async with _make_federation_client() as client: 

123 wk_resp = await client.get(f"{req.node_url}/.well-known/stigmem") 

124 if wk_resp.status_code == 200: 124 ↛ 129line 124 didn't jump to line 129 because the condition on line 124 was always true

125 fetched_pubkey = wk_resp.json().get("federation_pubkey") 

126 except Exception as exc: # nosec B110 — fetched_pubkey stays None → rejected below 

127 logger.debug("peer .well-known fetch failed: %s", exc) 

128 

129 final_status = "rejected" 

130 verified_at: str | None = None 

131 

132 if fetched_pubkey and fetched_pubkey == req.federation_pubkey: 132 ↛ 144line 132 didn't jump to line 144 because the condition on line 132 was always true

133 # Signed fields = everything except declaration_sig (spec §6.1 struct "above fields") 

134 signed_fields: dict[str, Any] = { 

135 "allowed_scopes": req.allowed_scopes, 

136 "federation_pubkey": req.federation_pubkey, 

137 "node_id": req.node_id, 

138 "node_url": req.node_url, 

139 "signed_at": req.signed_at, 

140 } 

141 if verify_declaration_sig(signed_fields, req.declaration_sig, fetched_pubkey): 

142 final_status = "pending_approval" 

143 

144 with db() as conn: 

145 conn.execute( 

146 "UPDATE peers SET status = ?, established_at = ? WHERE id = ?", 

147 (final_status, verified_at, peer_id), 

148 ) 

149 

150 return PeerRegisterResponse(peer_id=peer_id, status=final_status, verified_at=verified_at) 

151 

152 

153def approve_peer_impl( 

154 peer_id: str, 

155 pubkey_fingerprint: str, 

156 background_tasks: BackgroundTasks, 

157 identity: Identity, 

158) -> PeerApprovalResponse: 

159 """Approve a verified peer after operator out-of-band key confirmation.""" 

160 if not identity.can_admin_federation(): 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 raise HTTPException(status_code=403, detail="admin:federation required") 

162 

163 now = datetime.now(UTC).isoformat() 

164 fingerprint_mismatch_peer: dict[str, Any] | None = None 

165 with db() as conn: 

166 peer = conn.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)).fetchone() 

167 if peer is None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 raise HTTPException(status_code=404, detail="peer not found") 

169 if peer["status"] != "pending_approval": 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 raise HTTPException( 

171 status_code=409, 

172 detail=f"peer is not pending approval (status={peer['status']})", 

173 ) 

174 

175 expected = peer_pubkey_fingerprint(peer["federation_pubkey"]) 

176 if pubkey_fingerprint != expected: 

177 fingerprint_mismatch_peer = dict(peer) 

178 else: 

179 fingerprint_mismatch_peer = None 

180 

181 conn.execute( 

182 "UPDATE peers SET status = 'active', established_at = ? WHERE id = ?", 

183 (now, peer["id"]), 

184 ) 

185 

186 if fingerprint_mismatch_peer is not None: 

187 from . import federation as _fed_mod 

188 

189 _fed_mod.write_audit_log( 

190 fingerprint_mismatch_peer["id"], 

191 "peer_approval_failed", 

192 {"node_id": fingerprint_mismatch_peer["node_id"], "reason": "fingerprint_mismatch"}, 

193 ) 

194 raise HTTPException(status_code=400, detail="fingerprint mismatch") 

195 

196 from . import federation as _fed_mod 

197 

198 _fed_mod.write_audit_log( 

199 peer_id, 

200 "peer_approved", 

201 {"node_id": peer["node_id"], "approved_by": identity.entity_uri}, 

202 ) 

203 background_tasks.add_task( 

204 _check_tl_inclusion_for_peer, 

205 peer["node_id"], 

206 peer["node_url"], 

207 peer_id, 

208 ) 

209 return PeerApprovalResponse( 

210 peer_id=peer_id, 

211 node_id=peer["node_id"], 

212 status="active", 

213 approved_at=now, 

214 ) 

215 

216 

217async def _check_tl_inclusion_for_peer(node_id: str, node_url: str, peer_id: str) -> None: 

218 """Check TL inclusion proof for a newly registered peer (§19.2.3). 

219 

220 trust_mode=strict (enforce): no proof → downgrade peer to pending_tl_proof 

221 trust_mode=relaxed (warn): no proof → accept + audit warning 

222 trust_mode=off: skip entirely 

223 """ 

224 # Lazy lookup: tests monkey-patch ``federation.write_audit_log`` — 

225 # accessing via the module preserves those patches. 

226 from typing import cast as _cast 

227 

228 from . import federation as _fed_mod 

229 

230 _fed = _cast(Any, _fed_mod) 

231 

232 trust_mode = _fed.settings.trust_mode 

233 if trust_mode == "off": 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 return 

235 

236 # Try to fetch the peer's manifest from their well-known endpoint 

237 manifest_obj = None 

238 try: 

239 assert_safe_url(node_url, allow_schemes=frozenset({"https", "http"})) 

240 async with httpx.AsyncClient(timeout=10.0) as client: 

241 resp = await client.get( 

242 f"{node_url}/.well-known/stigmem-manifest.json", 

243 follow_redirects=False, 

244 ) 

245 if resp.status_code == 200: 

246 try: 

247 manifest_obj = manifest_from_dict(resp.json()) 

248 verify_manifest(manifest_obj, trust_mode=trust_mode) 

249 except ManifestError as exc: 

250 logger.warning("peer manifest from %s failed verification: %s", node_url, exc) 

251 manifest_obj = None 

252 except ValueError as exc: 

253 logger.warning("peer manifest from %s was not valid JSON: %s", node_url, exc) 

254 manifest_obj = None 

255 except Exception as exc: 

256 logger.warning("failed to fetch peer manifest from %s: %s", node_url, exc) 

257 manifest_obj = None 

258 

259 has_tl_proof = False 

260 if manifest_obj is not None: 260 ↛ 262line 260 didn't jump to line 262 because the condition on line 260 was never true

261 # Check whether the manifest has a TL entry recorded 

262 existing = get_peer_manifest(manifest_obj.entity_uri, refresh_if_expired=False) 

263 if existing is None: 

264 with contextlib.suppress(ManifestError): 

265 store_peer_manifest(manifest_obj.entity_uri, manifest_obj, trust_mode=trust_mode) 

266 

267 # Try to verify TL inclusion 

268 try: 

269 tl = make_transparency_log() 

270 from ..db import db as _db 

271 

272 with _db() as conn: 

273 row = conn.execute( 

274 "SELECT log_entry_json FROM federation_manifests WHERE entity_uri = ?", 

275 (manifest_obj.entity_uri,), 

276 ).fetchone() 

277 if row and row["log_entry_json"]: 

278 import json as _json 

279 

280 le_data = _json.loads(row["log_entry_json"]) 

281 le = LogEntry( 

282 log_id=le_data.get("log_id", ""), 

283 leaf_hash=le_data.get("leaf_hash", ""), 

284 log_index=le_data.get("log_index", -1), 

285 integrated_time=le_data.get("integrated_time", 0), 

286 inclusion_proof=le_data.get("inclusion_proof", {}), 

287 ) 

288 tl.verify_inclusion(le) 

289 has_tl_proof = True 

290 except TransparencyLogUnavailable as exc: 

291 logger.debug("transparency log unavailable for TL inclusion check: %s", exc) 

292 except Exception as exc: # nosec B110 — TL inclusion check is best-effort 

293 logger.debug("TL inclusion check failed: %s", exc) 

294 

295 if not has_tl_proof: 295 ↛ exitline 295 didn't return from function '_check_tl_inclusion_for_peer' because the condition on line 295 was always true

296 if trust_mode == "strict": 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 _fed.write_audit_log( 

298 peer_id, 

299 "tl_proof_missing", 

300 {"node_id": node_id, "action": "downgraded_to_pending_tl_proof"}, 

301 ) 

302 from ..db import db as _db 

303 

304 with _db() as conn: 

305 conn.execute( 

306 "UPDATE peers SET status = 'pending_tl_proof' WHERE id = ?", 

307 (peer_id,), 

308 ) 

309 else: 

310 _fed.write_audit_log( 

311 peer_id, 

312 "tl_proof_missing", 

313 {"node_id": node_id, "action": "accepted_with_warning", "trust_mode": trust_mode}, 

314 ) 

315 

316 

317def _authenticate_tombstone_caller( 

318 request: Request, 

319 authorization: str | None, 

320 x_stigmem_capability: str | None, 

321 try_peer_token_auth: Any, 

322 get_mtls_peer_cert: Any, 

323 fed_settings: Any, 

324) -> None: 

325 """F-1 fix: caller must present a valid peer-JWT OR a tombstone-write capability token. 

326 

327 Raises HTTPException on any auth failure. Returns None on success. 

328 """ 

329 peer_auth = try_peer_token_auth(authorization) 

330 if peer_auth is not None: 

331 if fed_settings.mtls_enabled and request is not None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 peer_cert = get_mtls_peer_cert(request) 

333 if not check_peer_san(peer_cert, peer_auth[0]["node_id"]): 

334 raise HTTPException( 

335 status_code=401, 

336 detail="peer certificate URI SAN does not match node_id", 

337 ) 

338 return 

339 

340 if x_stigmem_capability is None: 

341 raise HTTPException(status_code=401, detail="peer token or capability token required") 

342 

343 try: 

344 verify_token( 

345 x_stigmem_capability, 

346 lambda uri: get_peer_manifest( 

347 uri, refresh_if_expired=True, trust_mode=fed_settings.trust_mode 

348 ), 

349 trust_mode=fed_settings.trust_mode, 

350 ) 

351 except CapabilityTokenError as exc: 

352 raise HTTPException(status_code=401, detail=f"capability token invalid: {exc}") from exc 

353 try: 

354 cap_token = json.loads(x_stigmem_capability) 

355 except json.JSONDecodeError as exc: 

356 raise HTTPException( 

357 status_code=400, detail=f"malformed capability token JSON: {exc}" 

358 ) from exc 

359 if cap_token.get("verb", "") not in ("tombstone:write", "write"): 359 ↛ exitline 359 didn't return from function '_authenticate_tombstone_caller' because the condition on line 359 was always true

360 raise HTTPException(status_code=403, detail="capability token missing tombstone:write verb") 

361 

362 

363def _verify_signed_artifact_or_400( 

364 *, 

365 record: Any, 

366 key_id: str, 

367 artifact_label: str, # "tombstone" or "revocation" 

368 missing_manifest_detail: str, # exact wire-error string for the unknown-signer 401 

369 signer_uri: str, 

370 verifier: Any, # verify_tombstone_signature or verify_revocation_signature 

371 on_failure: Any | None = None, # callable(record, reason) emitted on bad signature 

372) -> None: 

373 """Look up the signer manifest, resolve the signing key, and verify the signature. 

374 

375 Raises HTTPException on any verification failure (no-key-id / unknown-signer / 

376 key-id-not-in-manifest / signature-mismatch). ``missing_manifest_detail`` is 

377 parameterised because the existing wire contract uses different wording for 

378 tombstones vs revocations. 

379 """ 

380 if not key_id: 

381 _audit_tombstone_ingest_rejected(record, artifact_label, f"{artifact_label}_missing_key_id") 

382 raise HTTPException( 

383 status_code=status.HTTP_400_BAD_REQUEST, 

384 detail=f"{artifact_label} missing key_id", 

385 ) 

386 manifest = get_peer_manifest(signer_uri) 

387 if manifest is None: 

388 _audit_tombstone_ingest_rejected(record, artifact_label, "signer_manifest_missing") 

389 raise HTTPException(status_code=401, detail=missing_manifest_detail) 

390 pubkey_b64 = _resolve_pubkey_for_key_id(manifest, key_id) 

391 if pubkey_b64 is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 _audit_tombstone_ingest_rejected(record, artifact_label, "key_id_not_in_signer_manifest") 

393 raise HTTPException(status_code=401, detail="key_id not in signer manifest") 

394 try: 

395 verifier(record, pubkey_b64) 

396 except ValueError as exc: 

397 if on_failure is not None: 

398 on_failure(record, str(exc)) 

399 _audit_tombstone_ingest_rejected(record, artifact_label, str(exc)) 

400 raise HTTPException( 

401 status_code=status.HTTP_400_BAD_REQUEST, 

402 detail=f"{artifact_label}_verification_failed: {exc}", 

403 ) from exc 

404 

405 

406def _ingest_revocation(payload: dict[str, Any], fed_settings: Any) -> dict[str, Any]: 

407 """Parse + verify + apply an inbound revocation. Returns the success response dict.""" 

408 from ..lifecycle.tombstone_signing import verify_revocation_signature 

409 from ..lifecycle.tombstones import apply_inbound_revocation 

410 

411 try: 

412 rev = TombstoneRevocationRecord(**payload) 

413 except Exception as exc: 

414 _audit_tombstone_payload_rejected(payload, "revocation", str(exc)) 

415 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc 

416 

417 _verify_signed_artifact_or_400( 

418 record=rev, 

419 key_id=rev.key_id or "", 

420 artifact_label="revocation", 

421 missing_manifest_detail="no manifest for revocation signer", 

422 signer_uri=rev.signed_by, 

423 verifier=verify_revocation_signature, 

424 ) 

425 

426 apply_inbound_revocation(rev) 

427 return {"status": "ok", "type": "revocation"} 

428 

429 

430def _ingest_tombstone(payload: dict[str, Any], fed_settings: Any) -> dict[str, Any]: 

431 """Parse + verify + apply an inbound tombstone. Returns the success response dict.""" 

432 from ..lifecycle.tombstone_signing import verify_tombstone_signature 

433 from ..lifecycle.tombstones import apply_inbound_tombstone 

434 

435 try: 

436 record = TombstoneRecord(**payload) 

437 except Exception as exc: 

438 _audit_tombstone_payload_rejected(payload, "tombstone", str(exc)) 

439 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc 

440 

441 _verify_signed_artifact_or_400( 

442 record=record, 

443 key_id=record.key_id or "", 

444 artifact_label="tombstone", 

445 missing_manifest_detail="no manifest for signer", 

446 signer_uri=record.signed_by, 

447 verifier=verify_tombstone_signature, 

448 on_failure=_emit_tombstone_verification_failed, 

449 ) 

450 

451 written = apply_inbound_tombstone(record) 

452 return {"status": "ok", "written": written} 

453 

454 

455def federation_ingest_tombstone_impl( 

456 request: Request, 

457 payload: dict[str, Any], 

458 authorization: str | None, 

459 x_stigmem_capability: str | None, 

460 try_peer_token_auth: Any, 

461 get_mtls_peer_cert: Any, 

462) -> dict[str, Any]: 

463 """Inbound tombstone push from a federation peer (§23.4.2). 

464 

465 Auth: peer JWT or capability token with tombstone:write verb (mirrors push_facts). 

466 Verifies signature against org manifest, writes to local tombstones table. 

467 """ 

468 # Lazy lookup: tests monkey-patch ``federation.settings``. 

469 from typing import cast as _cast 

470 

471 from . import federation as _fed_mod 

472 

473 fed_settings = _cast(Any, _fed_mod).settings 

474 

475 _authenticate_tombstone_caller( 

476 request, 

477 authorization, 

478 x_stigmem_capability, 

479 try_peer_token_auth, 

480 get_mtls_peer_cert, 

481 fed_settings, 

482 ) 

483 

484 if "tombstone_id" in payload: 

485 return _ingest_revocation(payload, fed_settings) 

486 return _ingest_tombstone(payload, fed_settings) 

487 

488 

489def _resolve_pubkey_for_key_id(manifest: Any, key_id: str) -> str | None: 

490 """Return base64url public key from manifest matching key_id, or None.""" 

491 if manifest.key_id == key_id: 491 ↛ 494line 491 didn't jump to line 494 because the condition on line 491 was always true

492 pk: str = manifest.public_key 

493 return pk 

494 for evt in getattr(manifest, "rotation_events", []): 

495 if getattr(evt, "new_key_id", None) == key_id: 

496 new_pk: str | None = getattr(evt, "new_public_key", None) 

497 return new_pk 

498 return None 

499 

500 

501def _emit_tombstone_verification_failed(record: TombstoneRecord, reason: str) -> None: 

502 import logging as _logging 

503 

504 _logging.getLogger("stigmem.tombstones.ingest").error( 

505 "tombstone_verification_failed: tombstone_id=%s entity=%s reason=%s", 

506 record.id, 

507 record.entity_uri, 

508 reason, 

509 ) 

510 

511 

512def _audit_tombstone_ingest_rejected(record: Any, artifact_label: str, reason: str) -> None: 

513 from ..observability.audit_event import emit_nofail 

514 

515 artifact_id = str(getattr(record, "id", "") or getattr(record, "tombstone_id", "")) 

516 signer_uri = str(getattr(record, "signed_by", "") or "system:federation") 

517 target_entity_uri = str( 

518 getattr(record, "entity_uri", "") or getattr(record, "tombstone_id", "") or artifact_id 

519 ) 

520 emit_nofail( 

521 "tombstone_federation_rejected", 

522 entity_uri=signer_uri, 

523 fact_id=artifact_id or None, 

524 source="federation", 

525 detail={ 

526 "artifact": artifact_label, 

527 "artifact_id": artifact_id, 

528 "target_entity_uri": target_entity_uri, 

529 "key_id": str(getattr(record, "key_id", "") or ""), 

530 "reason": reason, 

531 }, 

532 ) 

533 

534 

535def _audit_tombstone_payload_rejected( 

536 payload: dict[str, Any], 

537 artifact_label: str, 

538 reason: str, 

539) -> None: 

540 from ..observability.audit_event import emit_nofail 

541 

542 artifact_id = str(payload.get("id") or payload.get("tombstone_id") or "") 

543 signer_uri = str(payload.get("signed_by") or "system:federation") 

544 target_entity_uri = str(payload.get("entity_uri") or payload.get("tombstone_id") or artifact_id) 

545 emit_nofail( 

546 "tombstone_federation_rejected", 

547 entity_uri=signer_uri, 

548 fact_id=artifact_id or None, 

549 source="federation", 

550 detail={ 

551 "artifact": artifact_label, 

552 "artifact_id": artifact_id, 

553 "target_entity_uri": target_entity_uri, 

554 "key_id": str(payload.get("key_id") or ""), 

555 "reason": reason, 

556 }, 

557 )