Coverage for node / src / stigmem_node / routes / _federation_impl.py: 69%
232 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation route implementations extracted from routes/federation.py.
3These functions are the original route handler bodies; they are imported back
4into ``routes.federation`` and invoked from thin ``@router``-decorated wrappers.
5No behavioural changes — code was moved verbatim from federation.py.
7Tests monkey-patch attributes on the ``routes.federation`` module
8(``settings``, ``write_audit_log``). To honour those patches, this module
9looks those names up via ``routes.federation`` lazily inside the function
10bodies rather than binding them at import time.
11"""
13from __future__ import annotations
15import contextlib
16import hashlib
17import json
18import logging
19import uuid
20from datetime import UTC, datetime
21from typing import Any
23import httpx
24from fastapi import BackgroundTasks, HTTPException, Request, status
26from ..auth import Identity
27from ..db import db
28from ..federation.peer_token import verify_declaration_sig
29from ..federation.tls import check_peer_san
30from ..identity.capability import CapabilityTokenError, verify_token
31from ..identity.manifest import ManifestError, manifest_from_dict, verify_manifest
32from ..identity.transparency_log import LogEntry, TransparencyLogUnavailable, make_transparency_log
33from ..identity.trust_store import get_peer_manifest, store_peer_manifest
34from ..models.federation import (
35 PeerApprovalResponse,
36 PeerRegisterRequest,
37 PeerRegisterResponse,
38)
39from ..models.tombstones import (
40 TombstoneRecord,
41 TombstoneRevocationRecord,
42)
43from ..net_util import assert_safe_url
44from ..plugins import Deny, TenantContext, get_registry
46logger = logging.getLogger("stigmem.federation")
49def peer_pubkey_fingerprint(pubkey: str) -> str:
50 """Return the operator-verifiable fingerprint for a pinned peer public key."""
51 return f"sha256:{hashlib.sha256(pubkey.encode()).hexdigest()}"
54def _make_federation_client() -> httpx.AsyncClient:
55 from . import federation as _fed_mod
57 if _fed_mod.settings.mtls_enabled: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 from ..federation.tls import build_client_ssl_context
60 ssl_ctx = build_client_ssl_context(
61 _fed_mod.settings.tls_cert_path,
62 _fed_mod.settings.tls_key_path,
63 _fed_mod.settings.tls_ca_bundle,
64 )
65 return httpx.AsyncClient(verify=ssl_ctx, trust_env=False)
66 return httpx.AsyncClient(trust_env=False)
69async def register_peer_impl(
70 req: PeerRegisterRequest,
71 background_tasks: BackgroundTasks,
72 identity: Identity,
73) -> PeerRegisterResponse:
74 """Register a peer. Fetches its well-known doc and verifies declaration_sig (§5.6)."""
75 if not identity.can_federate(): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 raise HTTPException(status_code=403, detail="federate permission required")
77 decision = get_registry().fire_voting(
78 "federation_peer_authenticate",
79 req=req,
80 identity=identity,
81 tenant=TenantContext(
82 tenant_id=identity.tenant_id,
83 metadata={"tenant_context_source": "hook"},
84 ),
85 )
86 if isinstance(decision, Deny): 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 raise HTTPException(status_code=403, detail=decision.reason)
89 peer_id = str(uuid.uuid4())
90 allowed_scopes_json = json.dumps(sorted(req.allowed_scopes))
92 with db() as conn:
93 existing = conn.execute(
94 "SELECT id, status FROM peers WHERE node_id = ?", (req.node_id,)
95 ).fetchone()
96 if existing: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 raise HTTPException(
98 status_code=409,
99 detail=f"peer already registered (status={existing['status']})",
100 )
101 conn.execute(
102 """INSERT INTO peers
103 (id, node_id, node_url, federation_pubkey, allowed_scopes,
104 status, established_at, declaration_sig, signed_at)
105 VALUES (?,?,?,?,?,?,?,?,?)""",
106 (
107 peer_id,
108 req.node_id,
109 req.node_url,
110 req.federation_pubkey,
111 allowed_scopes_json,
112 "pending_verification",
113 None,
114 req.declaration_sig,
115 req.signed_at,
116 ),
117 )
119 # Fetch peer's /.well-known/stigmem to retrieve their published pubkey (§5.6 step 1–3)
120 fetched_pubkey: str | None = None
121 try:
122 async with _make_federation_client() as client:
123 wk_resp = await client.get(f"{req.node_url}/.well-known/stigmem")
124 if wk_resp.status_code == 200: 124 ↛ 129line 124 didn't jump to line 129 because the condition on line 124 was always true
125 fetched_pubkey = wk_resp.json().get("federation_pubkey")
126 except Exception as exc: # nosec B110 — fetched_pubkey stays None → rejected below
127 logger.debug("peer .well-known fetch failed: %s", exc)
129 final_status = "rejected"
130 verified_at: str | None = None
132 if fetched_pubkey and fetched_pubkey == req.federation_pubkey: 132 ↛ 144line 132 didn't jump to line 144 because the condition on line 132 was always true
133 # Signed fields = everything except declaration_sig (spec §6.1 struct "above fields")
134 signed_fields: dict[str, Any] = {
135 "allowed_scopes": req.allowed_scopes,
136 "federation_pubkey": req.federation_pubkey,
137 "node_id": req.node_id,
138 "node_url": req.node_url,
139 "signed_at": req.signed_at,
140 }
141 if verify_declaration_sig(signed_fields, req.declaration_sig, fetched_pubkey):
142 final_status = "pending_approval"
144 with db() as conn:
145 conn.execute(
146 "UPDATE peers SET status = ?, established_at = ? WHERE id = ?",
147 (final_status, verified_at, peer_id),
148 )
150 return PeerRegisterResponse(peer_id=peer_id, status=final_status, verified_at=verified_at)
153def approve_peer_impl(
154 peer_id: str,
155 pubkey_fingerprint: str,
156 background_tasks: BackgroundTasks,
157 identity: Identity,
158) -> PeerApprovalResponse:
159 """Approve a verified peer after operator out-of-band key confirmation."""
160 if not identity.can_admin_federation(): 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 raise HTTPException(status_code=403, detail="admin:federation required")
163 now = datetime.now(UTC).isoformat()
164 fingerprint_mismatch_peer: dict[str, Any] | None = None
165 with db() as conn:
166 peer = conn.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)).fetchone()
167 if peer is None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 raise HTTPException(status_code=404, detail="peer not found")
169 if peer["status"] != "pending_approval": 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 raise HTTPException(
171 status_code=409,
172 detail=f"peer is not pending approval (status={peer['status']})",
173 )
175 expected = peer_pubkey_fingerprint(peer["federation_pubkey"])
176 if pubkey_fingerprint != expected:
177 fingerprint_mismatch_peer = dict(peer)
178 else:
179 fingerprint_mismatch_peer = None
181 conn.execute(
182 "UPDATE peers SET status = 'active', established_at = ? WHERE id = ?",
183 (now, peer["id"]),
184 )
186 if fingerprint_mismatch_peer is not None:
187 from . import federation as _fed_mod
189 _fed_mod.write_audit_log(
190 fingerprint_mismatch_peer["id"],
191 "peer_approval_failed",
192 {"node_id": fingerprint_mismatch_peer["node_id"], "reason": "fingerprint_mismatch"},
193 )
194 raise HTTPException(status_code=400, detail="fingerprint mismatch")
196 from . import federation as _fed_mod
198 _fed_mod.write_audit_log(
199 peer_id,
200 "peer_approved",
201 {"node_id": peer["node_id"], "approved_by": identity.entity_uri},
202 )
203 background_tasks.add_task(
204 _check_tl_inclusion_for_peer,
205 peer["node_id"],
206 peer["node_url"],
207 peer_id,
208 )
209 return PeerApprovalResponse(
210 peer_id=peer_id,
211 node_id=peer["node_id"],
212 status="active",
213 approved_at=now,
214 )
217async def _check_tl_inclusion_for_peer(node_id: str, node_url: str, peer_id: str) -> None:
218 """Check TL inclusion proof for a newly registered peer (§19.2.3).
220 trust_mode=strict (enforce): no proof → downgrade peer to pending_tl_proof
221 trust_mode=relaxed (warn): no proof → accept + audit warning
222 trust_mode=off: skip entirely
223 """
224 # Lazy lookup: tests monkey-patch ``federation.write_audit_log`` —
225 # accessing via the module preserves those patches.
226 from typing import cast as _cast
228 from . import federation as _fed_mod
230 _fed = _cast(Any, _fed_mod)
232 trust_mode = _fed.settings.trust_mode
233 if trust_mode == "off": 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 return
236 # Try to fetch the peer's manifest from their well-known endpoint
237 manifest_obj = None
238 try:
239 assert_safe_url(node_url, allow_schemes=frozenset({"https", "http"}))
240 async with httpx.AsyncClient(timeout=10.0) as client:
241 resp = await client.get(
242 f"{node_url}/.well-known/stigmem-manifest.json",
243 follow_redirects=False,
244 )
245 if resp.status_code == 200:
246 try:
247 manifest_obj = manifest_from_dict(resp.json())
248 verify_manifest(manifest_obj, trust_mode=trust_mode)
249 except ManifestError as exc:
250 logger.warning("peer manifest from %s failed verification: %s", node_url, exc)
251 manifest_obj = None
252 except ValueError as exc:
253 logger.warning("peer manifest from %s was not valid JSON: %s", node_url, exc)
254 manifest_obj = None
255 except Exception as exc:
256 logger.warning("failed to fetch peer manifest from %s: %s", node_url, exc)
257 manifest_obj = None
259 has_tl_proof = False
260 if manifest_obj is not None: 260 ↛ 262line 260 didn't jump to line 262 because the condition on line 260 was never true
261 # Check whether the manifest has a TL entry recorded
262 existing = get_peer_manifest(manifest_obj.entity_uri, refresh_if_expired=False)
263 if existing is None:
264 with contextlib.suppress(ManifestError):
265 store_peer_manifest(manifest_obj.entity_uri, manifest_obj, trust_mode=trust_mode)
267 # Try to verify TL inclusion
268 try:
269 tl = make_transparency_log()
270 from ..db import db as _db
272 with _db() as conn:
273 row = conn.execute(
274 "SELECT log_entry_json FROM federation_manifests WHERE entity_uri = ?",
275 (manifest_obj.entity_uri,),
276 ).fetchone()
277 if row and row["log_entry_json"]:
278 import json as _json
280 le_data = _json.loads(row["log_entry_json"])
281 le = LogEntry(
282 log_id=le_data.get("log_id", ""),
283 leaf_hash=le_data.get("leaf_hash", ""),
284 log_index=le_data.get("log_index", -1),
285 integrated_time=le_data.get("integrated_time", 0),
286 inclusion_proof=le_data.get("inclusion_proof", {}),
287 )
288 tl.verify_inclusion(le)
289 has_tl_proof = True
290 except TransparencyLogUnavailable as exc:
291 logger.debug("transparency log unavailable for TL inclusion check: %s", exc)
292 except Exception as exc: # nosec B110 — TL inclusion check is best-effort
293 logger.debug("TL inclusion check failed: %s", exc)
295 if not has_tl_proof: 295 ↛ exitline 295 didn't return from function '_check_tl_inclusion_for_peer' because the condition on line 295 was always true
296 if trust_mode == "strict": 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 _fed.write_audit_log(
298 peer_id,
299 "tl_proof_missing",
300 {"node_id": node_id, "action": "downgraded_to_pending_tl_proof"},
301 )
302 from ..db import db as _db
304 with _db() as conn:
305 conn.execute(
306 "UPDATE peers SET status = 'pending_tl_proof' WHERE id = ?",
307 (peer_id,),
308 )
309 else:
310 _fed.write_audit_log(
311 peer_id,
312 "tl_proof_missing",
313 {"node_id": node_id, "action": "accepted_with_warning", "trust_mode": trust_mode},
314 )
317def _authenticate_tombstone_caller(
318 request: Request,
319 authorization: str | None,
320 x_stigmem_capability: str | None,
321 try_peer_token_auth: Any,
322 get_mtls_peer_cert: Any,
323 fed_settings: Any,
324) -> None:
325 """F-1 fix: caller must present a valid peer-JWT OR a tombstone-write capability token.
327 Raises HTTPException on any auth failure. Returns None on success.
328 """
329 peer_auth = try_peer_token_auth(authorization)
330 if peer_auth is not None:
331 if fed_settings.mtls_enabled and request is not None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 peer_cert = get_mtls_peer_cert(request)
333 if not check_peer_san(peer_cert, peer_auth[0]["node_id"]):
334 raise HTTPException(
335 status_code=401,
336 detail="peer certificate URI SAN does not match node_id",
337 )
338 return
340 if x_stigmem_capability is None:
341 raise HTTPException(status_code=401, detail="peer token or capability token required")
343 try:
344 verify_token(
345 x_stigmem_capability,
346 lambda uri: get_peer_manifest(
347 uri, refresh_if_expired=True, trust_mode=fed_settings.trust_mode
348 ),
349 trust_mode=fed_settings.trust_mode,
350 )
351 except CapabilityTokenError as exc:
352 raise HTTPException(status_code=401, detail=f"capability token invalid: {exc}") from exc
353 try:
354 cap_token = json.loads(x_stigmem_capability)
355 except json.JSONDecodeError as exc:
356 raise HTTPException(
357 status_code=400, detail=f"malformed capability token JSON: {exc}"
358 ) from exc
359 if cap_token.get("verb", "") not in ("tombstone:write", "write"): 359 ↛ exitline 359 didn't return from function '_authenticate_tombstone_caller' because the condition on line 359 was always true
360 raise HTTPException(status_code=403, detail="capability token missing tombstone:write verb")
363def _verify_signed_artifact_or_400(
364 *,
365 record: Any,
366 key_id: str,
367 artifact_label: str, # "tombstone" or "revocation"
368 missing_manifest_detail: str, # exact wire-error string for the unknown-signer 401
369 signer_uri: str,
370 verifier: Any, # verify_tombstone_signature or verify_revocation_signature
371 on_failure: Any | None = None, # callable(record, reason) emitted on bad signature
372) -> None:
373 """Look up the signer manifest, resolve the signing key, and verify the signature.
375 Raises HTTPException on any verification failure (no-key-id / unknown-signer /
376 key-id-not-in-manifest / signature-mismatch). ``missing_manifest_detail`` is
377 parameterised because the existing wire contract uses different wording for
378 tombstones vs revocations.
379 """
380 if not key_id:
381 _audit_tombstone_ingest_rejected(record, artifact_label, f"{artifact_label}_missing_key_id")
382 raise HTTPException(
383 status_code=status.HTTP_400_BAD_REQUEST,
384 detail=f"{artifact_label} missing key_id",
385 )
386 manifest = get_peer_manifest(signer_uri)
387 if manifest is None:
388 _audit_tombstone_ingest_rejected(record, artifact_label, "signer_manifest_missing")
389 raise HTTPException(status_code=401, detail=missing_manifest_detail)
390 pubkey_b64 = _resolve_pubkey_for_key_id(manifest, key_id)
391 if pubkey_b64 is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 _audit_tombstone_ingest_rejected(record, artifact_label, "key_id_not_in_signer_manifest")
393 raise HTTPException(status_code=401, detail="key_id not in signer manifest")
394 try:
395 verifier(record, pubkey_b64)
396 except ValueError as exc:
397 if on_failure is not None:
398 on_failure(record, str(exc))
399 _audit_tombstone_ingest_rejected(record, artifact_label, str(exc))
400 raise HTTPException(
401 status_code=status.HTTP_400_BAD_REQUEST,
402 detail=f"{artifact_label}_verification_failed: {exc}",
403 ) from exc
406def _ingest_revocation(payload: dict[str, Any], fed_settings: Any) -> dict[str, Any]:
407 """Parse + verify + apply an inbound revocation. Returns the success response dict."""
408 from ..lifecycle.tombstone_signing import verify_revocation_signature
409 from ..lifecycle.tombstones import apply_inbound_revocation
411 try:
412 rev = TombstoneRevocationRecord(**payload)
413 except Exception as exc:
414 _audit_tombstone_payload_rejected(payload, "revocation", str(exc))
415 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
417 _verify_signed_artifact_or_400(
418 record=rev,
419 key_id=rev.key_id or "",
420 artifact_label="revocation",
421 missing_manifest_detail="no manifest for revocation signer",
422 signer_uri=rev.signed_by,
423 verifier=verify_revocation_signature,
424 )
426 apply_inbound_revocation(rev)
427 return {"status": "ok", "type": "revocation"}
430def _ingest_tombstone(payload: dict[str, Any], fed_settings: Any) -> dict[str, Any]:
431 """Parse + verify + apply an inbound tombstone. Returns the success response dict."""
432 from ..lifecycle.tombstone_signing import verify_tombstone_signature
433 from ..lifecycle.tombstones import apply_inbound_tombstone
435 try:
436 record = TombstoneRecord(**payload)
437 except Exception as exc:
438 _audit_tombstone_payload_rejected(payload, "tombstone", str(exc))
439 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
441 _verify_signed_artifact_or_400(
442 record=record,
443 key_id=record.key_id or "",
444 artifact_label="tombstone",
445 missing_manifest_detail="no manifest for signer",
446 signer_uri=record.signed_by,
447 verifier=verify_tombstone_signature,
448 on_failure=_emit_tombstone_verification_failed,
449 )
451 written = apply_inbound_tombstone(record)
452 return {"status": "ok", "written": written}
455def federation_ingest_tombstone_impl(
456 request: Request,
457 payload: dict[str, Any],
458 authorization: str | None,
459 x_stigmem_capability: str | None,
460 try_peer_token_auth: Any,
461 get_mtls_peer_cert: Any,
462) -> dict[str, Any]:
463 """Inbound tombstone push from a federation peer (§23.4.2).
465 Auth: peer JWT or capability token with tombstone:write verb (mirrors push_facts).
466 Verifies signature against org manifest, writes to local tombstones table.
467 """
468 # Lazy lookup: tests monkey-patch ``federation.settings``.
469 from typing import cast as _cast
471 from . import federation as _fed_mod
473 fed_settings = _cast(Any, _fed_mod).settings
475 _authenticate_tombstone_caller(
476 request,
477 authorization,
478 x_stigmem_capability,
479 try_peer_token_auth,
480 get_mtls_peer_cert,
481 fed_settings,
482 )
484 if "tombstone_id" in payload:
485 return _ingest_revocation(payload, fed_settings)
486 return _ingest_tombstone(payload, fed_settings)
489def _resolve_pubkey_for_key_id(manifest: Any, key_id: str) -> str | None:
490 """Return base64url public key from manifest matching key_id, or None."""
491 if manifest.key_id == key_id: 491 ↛ 494line 491 didn't jump to line 494 because the condition on line 491 was always true
492 pk: str = manifest.public_key
493 return pk
494 for evt in getattr(manifest, "rotation_events", []):
495 if getattr(evt, "new_key_id", None) == key_id:
496 new_pk: str | None = getattr(evt, "new_public_key", None)
497 return new_pk
498 return None
501def _emit_tombstone_verification_failed(record: TombstoneRecord, reason: str) -> None:
502 import logging as _logging
504 _logging.getLogger("stigmem.tombstones.ingest").error(
505 "tombstone_verification_failed: tombstone_id=%s entity=%s reason=%s",
506 record.id,
507 record.entity_uri,
508 reason,
509 )
512def _audit_tombstone_ingest_rejected(record: Any, artifact_label: str, reason: str) -> None:
513 from ..observability.audit_event import emit_nofail
515 artifact_id = str(getattr(record, "id", "") or getattr(record, "tombstone_id", ""))
516 signer_uri = str(getattr(record, "signed_by", "") or "system:federation")
517 target_entity_uri = str(
518 getattr(record, "entity_uri", "") or getattr(record, "tombstone_id", "") or artifact_id
519 )
520 emit_nofail(
521 "tombstone_federation_rejected",
522 entity_uri=signer_uri,
523 fact_id=artifact_id or None,
524 source="federation",
525 detail={
526 "artifact": artifact_label,
527 "artifact_id": artifact_id,
528 "target_entity_uri": target_entity_uri,
529 "key_id": str(getattr(record, "key_id", "") or ""),
530 "reason": reason,
531 },
532 )
535def _audit_tombstone_payload_rejected(
536 payload: dict[str, Any],
537 artifact_label: str,
538 reason: str,
539) -> None:
540 from ..observability.audit_event import emit_nofail
542 artifact_id = str(payload.get("id") or payload.get("tombstone_id") or "")
543 signer_uri = str(payload.get("signed_by") or "system:federation")
544 target_entity_uri = str(payload.get("entity_uri") or payload.get("tombstone_id") or artifact_id)
545 emit_nofail(
546 "tombstone_federation_rejected",
547 entity_uri=signer_uri,
548 fact_id=artifact_id or None,
549 source="federation",
550 detail={
551 "artifact": artifact_label,
552 "artifact_id": artifact_id,
553 "target_entity_uri": target_entity_uri,
554 "key_id": str(payload.get("key_id") or ""),
555 "reason": reason,
556 },
557 )