Coverage for node / src / stigmem_node / routes / federation / common.py: 79%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Shared helpers and compatibility exports for federation route modules."""
3from __future__ import annotations
5import json
6import logging
7import sys
8from typing import Annotated, Any
10from fastapi import APIRouter, Depends, Header, HTTPException, Request
12from ...db import db
13from ...federation.peer_token import TokenError, verify_peer_token
14from ...federation.tls import check_peer_san
16logger = logging.getLogger("stigmem.federation")
18router = APIRouter(tags=["federation"])
21def _public_module() -> Any:
22 """Return the public federation module so test monkey-patches stay visible."""
23 return sys.modules["stigmem_node.routes.federation"]
25def _allowed_output_scopes(peer: dict[str, Any], token_payload: dict[str, Any]) -> set[str]:
26 """Intersection of peer's declaration allowed_scopes and token's scopes claim (§5.8)."""
27 peer_scopes = set(json.loads(peer["allowed_scopes"]))
28 token_scopes = set(token_payload.get("scopes", []))
29 combined = peer_scopes & token_scopes
30 combined.discard("local")
31 if not _public_module().settings.federation_allow_team: 31 ↛ 33line 31 didn't jump to line 33 because the condition on line 31 was always true
32 combined.discard("team")
33 return combined
36# ---------------------------------------------------------------------------
37# Peer-token dependency
38# ---------------------------------------------------------------------------
41def _get_mtls_peer_cert(request: Request) -> dict[str, Any]:
42 """Extract the TLS peer certificate dict from the ASGI transport (uvicorn).
44 Returns an empty dict when not running under TLS (tests, plaintext mode).
45 """
46 transport = request.scope.get("transport")
47 if transport is None:
48 return {}
49 ssl_obj = transport.get_extra_info("ssl_object")
50 if ssl_obj is None: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 return {}
52 return ssl_obj.getpeercert() or {}
55def _require_peer_token(
56 request: Request,
57 authorization: Annotated[str | None, Header(alias="authorization")] = None,
58) -> tuple[dict[str, Any], dict[str, Any]]:
59 """Verify incoming peer token. Returns (peer_dict, token_payload) or raises 401."""
60 if authorization is None or not authorization.lower().startswith("bearer "):
61 raise HTTPException(status_code=401, detail="peer token required")
63 raw_token = authorization[7:]
65 # Decode header without sig verification to extract iss
66 import jwt as _jwt
68 try:
69 # exp/iat are epoch_ms per spec §3.5; disable all claim validation for header-only peek
70 unverified: dict[str, Any] = _jwt.decode(
71 raw_token,
72 options={
73 "verify_signature": False,
74 "verify_exp": False,
75 "verify_iat": False,
76 "verify_nbf": False,
77 "verify_aud": False,
78 },
79 algorithms=["EdDSA"],
80 )
81 except Exception as exc:
82 raise HTTPException(status_code=401, detail="malformed token") from exc
84 iss = unverified.get("iss", "")
86 with db() as conn:
87 peer_row = conn.execute(
88 "SELECT * FROM peers WHERE node_id = ?",
89 (iss,),
90 ).fetchone()
92 if peer_row is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 _public_module().write_audit_log(
94 iss, "rejected_token", {"reason": "peer_not_found", "iss": iss}
95 )
96 raise HTTPException(status_code=401, detail="peer not registered")
98 if peer_row["status"] != "active":
99 _public_module().write_audit_log(
100 peer_row["id"],
101 "rejected_token",
102 {"reason": "peer_not_approved", "iss": iss, "status": peer_row["status"]},
103 )
104 raise HTTPException(status_code=401, detail="peer_not_approved")
106 peer = dict(peer_row)
108 try:
109 payload = verify_peer_token(raw_token, peer["federation_pubkey"], peer["id"])
110 except TokenError as exc:
111 event = "replay_attempt" if exc.kind == "nonce_already_seen" else "rejected_token"
112 _public_module().write_audit_log(peer["id"], event, {"reason": exc.kind})
113 raise HTTPException(status_code=401, detail=exc.kind) from exc
115 # §22.1.2.4 — bind TLS cert identity to JWT iss; rejects cert-swapping attacks.
116 if _public_module().settings.mtls_enabled: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 peer_cert = _get_mtls_peer_cert(request)
118 if peer_cert and not check_peer_san(peer_cert, peer["node_id"]):
119 _public_module().write_audit_log(
120 peer["id"], "san_mismatch", {"node_id": peer["node_id"]}
121 )
122 raise HTTPException(
123 status_code=401,
124 detail="peer certificate URI SAN does not match node_id",
125 )
126 if not peer_cert:
127 logger.warning(
128 "mTLS peer certificate was not exposed by the ASGI server; "
129 "falling back to TLS-layer client certificate verification for %s",
130 peer["node_id"],
131 )
133 return peer, payload
136PeerTokenDep = Annotated[tuple[dict[str, Any], dict[str, Any]], Depends(_require_peer_token)]
139def _try_peer_token_auth(
140 authorization: str | None,
141) -> tuple[dict[str, Any], dict[str, Any]] | None:
142 """Soft peer-JWT auth: returns (peer, payload) on success, None on failure.
144 Unlike _require_peer_token, never raises — used so push_facts can fall
145 through to the capability-token path when peer JWT is absent or invalid.
146 """
147 if authorization is None or not authorization.lower().startswith("bearer "):
148 return None
150 raw_token = authorization[7:]
152 import jwt as _jwt
154 try:
155 unverified: dict[str, Any] = _jwt.decode(
156 raw_token,
157 options={
158 "verify_signature": False,
159 "verify_exp": False,
160 "verify_iat": False,
161 "verify_nbf": False,
162 "verify_aud": False,
163 },
164 algorithms=["EdDSA"],
165 )
166 except Exception:
167 return None
169 iss = unverified.get("iss", "")
170 with db() as conn:
171 peer_row = conn.execute(
172 "SELECT * FROM peers WHERE node_id = ?",
173 (iss,),
174 ).fetchone()
176 if peer_row is None or peer_row["status"] != "active": 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 return None
179 peer = dict(peer_row)
180 try:
181 payload = verify_peer_token(raw_token, peer["federation_pubkey"], peer["id"])
182 except TokenError:
183 return None
185 return peer, payload
188def _cap_token_covers_scope(token_object: str, scope: str) -> bool:
189 """Return True if the capability token's object covers the given fact scope (H-SEC-2)."""
190 # "stigmem://facts" is a wildcard covering all scopes
191 if token_object == "stigmem://facts": # nosec B105 — URI scheme constant, not a password
192 return True
193 # "stigmem://facts/scope:X" covers exactly scope X
194 return token_object == f"stigmem://facts/scope:{scope}"