Coverage for node / src / stigmem_node / routes / federation / common.py: 79%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Shared helpers and compatibility exports for federation route modules.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import logging 

7import sys 

8from typing import Annotated, Any 

9 

10from fastapi import APIRouter, Depends, Header, HTTPException, Request 

11 

12from ...db import db 

13from ...federation.peer_token import TokenError, verify_peer_token 

14from ...federation.tls import check_peer_san 

15 

16logger = logging.getLogger("stigmem.federation") 

17 

18router = APIRouter(tags=["federation"]) 

19 

20 

21def _public_module() -> Any: 

22 """Return the public federation module so test monkey-patches stay visible.""" 

23 return sys.modules["stigmem_node.routes.federation"] 

24 

25def _allowed_output_scopes(peer: dict[str, Any], token_payload: dict[str, Any]) -> set[str]: 

26 """Intersection of peer's declaration allowed_scopes and token's scopes claim (§5.8).""" 

27 peer_scopes = set(json.loads(peer["allowed_scopes"])) 

28 token_scopes = set(token_payload.get("scopes", [])) 

29 combined = peer_scopes & token_scopes 

30 combined.discard("local") 

31 if not _public_module().settings.federation_allow_team: 31 ↛ 33line 31 didn't jump to line 33 because the condition on line 31 was always true

32 combined.discard("team") 

33 return combined 

34 

35 

36# --------------------------------------------------------------------------- 

37# Peer-token dependency 

38# --------------------------------------------------------------------------- 

39 

40 

41def _get_mtls_peer_cert(request: Request) -> dict[str, Any]: 

42 """Extract the TLS peer certificate dict from the ASGI transport (uvicorn). 

43 

44 Returns an empty dict when not running under TLS (tests, plaintext mode). 

45 """ 

46 transport = request.scope.get("transport") 

47 if transport is None: 

48 return {} 

49 ssl_obj = transport.get_extra_info("ssl_object") 

50 if ssl_obj is None: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 return {} 

52 return ssl_obj.getpeercert() or {} 

53 

54 

55def _require_peer_token( 

56 request: Request, 

57 authorization: Annotated[str | None, Header(alias="authorization")] = None, 

58) -> tuple[dict[str, Any], dict[str, Any]]: 

59 """Verify incoming peer token. Returns (peer_dict, token_payload) or raises 401.""" 

60 if authorization is None or not authorization.lower().startswith("bearer "): 

61 raise HTTPException(status_code=401, detail="peer token required") 

62 

63 raw_token = authorization[7:] 

64 

65 # Decode header without sig verification to extract iss 

66 import jwt as _jwt 

67 

68 try: 

69 # exp/iat are epoch_ms per spec §3.5; disable all claim validation for header-only peek 

70 unverified: dict[str, Any] = _jwt.decode( 

71 raw_token, 

72 options={ 

73 "verify_signature": False, 

74 "verify_exp": False, 

75 "verify_iat": False, 

76 "verify_nbf": False, 

77 "verify_aud": False, 

78 }, 

79 algorithms=["EdDSA"], 

80 ) 

81 except Exception as exc: 

82 raise HTTPException(status_code=401, detail="malformed token") from exc 

83 

84 iss = unverified.get("iss", "") 

85 

86 with db() as conn: 

87 peer_row = conn.execute( 

88 "SELECT * FROM peers WHERE node_id = ?", 

89 (iss,), 

90 ).fetchone() 

91 

92 if peer_row is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 _public_module().write_audit_log( 

94 iss, "rejected_token", {"reason": "peer_not_found", "iss": iss} 

95 ) 

96 raise HTTPException(status_code=401, detail="peer not registered") 

97 

98 if peer_row["status"] != "active": 

99 _public_module().write_audit_log( 

100 peer_row["id"], 

101 "rejected_token", 

102 {"reason": "peer_not_approved", "iss": iss, "status": peer_row["status"]}, 

103 ) 

104 raise HTTPException(status_code=401, detail="peer_not_approved") 

105 

106 peer = dict(peer_row) 

107 

108 try: 

109 payload = verify_peer_token(raw_token, peer["federation_pubkey"], peer["id"]) 

110 except TokenError as exc: 

111 event = "replay_attempt" if exc.kind == "nonce_already_seen" else "rejected_token" 

112 _public_module().write_audit_log(peer["id"], event, {"reason": exc.kind}) 

113 raise HTTPException(status_code=401, detail=exc.kind) from exc 

114 

115 # §22.1.2.4 — bind TLS cert identity to JWT iss; rejects cert-swapping attacks. 

116 if _public_module().settings.mtls_enabled: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 peer_cert = _get_mtls_peer_cert(request) 

118 if peer_cert and not check_peer_san(peer_cert, peer["node_id"]): 

119 _public_module().write_audit_log( 

120 peer["id"], "san_mismatch", {"node_id": peer["node_id"]} 

121 ) 

122 raise HTTPException( 

123 status_code=401, 

124 detail="peer certificate URI SAN does not match node_id", 

125 ) 

126 if not peer_cert: 

127 logger.warning( 

128 "mTLS peer certificate was not exposed by the ASGI server; " 

129 "falling back to TLS-layer client certificate verification for %s", 

130 peer["node_id"], 

131 ) 

132 

133 return peer, payload 

134 

135 

136PeerTokenDep = Annotated[tuple[dict[str, Any], dict[str, Any]], Depends(_require_peer_token)] 

137 

138 

139def _try_peer_token_auth( 

140 authorization: str | None, 

141) -> tuple[dict[str, Any], dict[str, Any]] | None: 

142 """Soft peer-JWT auth: returns (peer, payload) on success, None on failure. 

143 

144 Unlike _require_peer_token, never raises — used so push_facts can fall 

145 through to the capability-token path when peer JWT is absent or invalid. 

146 """ 

147 if authorization is None or not authorization.lower().startswith("bearer "): 

148 return None 

149 

150 raw_token = authorization[7:] 

151 

152 import jwt as _jwt 

153 

154 try: 

155 unverified: dict[str, Any] = _jwt.decode( 

156 raw_token, 

157 options={ 

158 "verify_signature": False, 

159 "verify_exp": False, 

160 "verify_iat": False, 

161 "verify_nbf": False, 

162 "verify_aud": False, 

163 }, 

164 algorithms=["EdDSA"], 

165 ) 

166 except Exception: 

167 return None 

168 

169 iss = unverified.get("iss", "") 

170 with db() as conn: 

171 peer_row = conn.execute( 

172 "SELECT * FROM peers WHERE node_id = ?", 

173 (iss,), 

174 ).fetchone() 

175 

176 if peer_row is None or peer_row["status"] != "active": 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 return None 

178 

179 peer = dict(peer_row) 

180 try: 

181 payload = verify_peer_token(raw_token, peer["federation_pubkey"], peer["id"]) 

182 except TokenError: 

183 return None 

184 

185 return peer, payload 

186 

187 

188def _cap_token_covers_scope(token_object: str, scope: str) -> bool: 

189 """Return True if the capability token's object covers the given fact scope (H-SEC-2).""" 

190 # "stigmem://facts" is a wildcard covering all scopes 

191 if token_object == "stigmem://facts": # nosec B105 — URI scheme constant, not a password 

192 return True 

193 # "stigmem://facts/scope:X" covers exactly scope X 

194 return token_object == f"stigmem://facts/scope:{scope}"