Coverage for node / src / stigmem_node / identity / capability.py: 82%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Capability token signing and verification — spec §19.3.2–§19.3.3 (C-SEC-1 / M-SEC-2). 

2 

3Public surface: 

4 CapabilityTokenError — raised on any token structural or cryptographic violation 

5 load_node_private_key() -> Ed25519PrivateKey | None 

6 sign_token(token_body) -> str (base64url signature) 

7 sign_revocation_event(event) -> str (base64url signature) 

8 verify_token(token_json, get_manifest, *, trust_mode) -> bool 

9 

10Security requirements: 

11 - JCS via canonicaljson (RFC 8785) for both signing and verification 

12 - token_version must equal 1; tokens without it are rejected (M-SEC-2) 

13 - Signature covers all token fields except "signature" itself 

14 - Revocation check via DB (step 6 of §19.3.3) 

15 - Private key singleton is seed-keyed to correctly reflect settings changes in tests 

16""" 

17 

18from __future__ import annotations 

19 

20import base64 

21import hmac 

22import json 

23import logging 

24from collections.abc import Callable 

25from dataclasses import dataclass 

26from datetime import UTC, datetime, timedelta 

27from typing import Any 

28 

29import canonicaljson 

30from cryptography.exceptions import InvalidSignature 

31from cryptography.hazmat.primitives.asymmetric.ed25519 import ( 

32 Ed25519PrivateKey, 

33 Ed25519PublicKey, 

34) 

35 

36from .manifest import ManifestError, OrgManifest, verify_manifest 

37 

38logger = logging.getLogger("stigmem.identity.capability") 

39 

40_TOKEN_VERSION = 1 

41# Dual-trust window: old key stays in accept_set for this many days after rotation. 

42# Must be ≥ max token TTL (90 days per §19.3.2 / §22.2.2). 

43_DUAL_TRUST_DAYS = 90 

44 

45@dataclass 

46class _NodePrivateKeyCache: 

47 """Seed-keyed singleton invalidated automatically when settings change.""" 

48 

49 key: Ed25519PrivateKey | None = None 

50 seed: str = "" 

51 

52 

53_node_private_key_cache = _NodePrivateKeyCache() 

54 

55 

56class CapabilityTokenError(ValueError): 

57 """Raised when a capability token fails any verification step.""" 

58 

59 

60# --------------------------------------------------------------------------- 

61# Internal helpers 

62# --------------------------------------------------------------------------- 

63 

64 

65def _pad(s: str) -> str: 

66 return s + "=" * (-len(s) % 4) 

67 

68 

69def _pubkey_from_b64(b64: str) -> Ed25519PublicKey: 

70 raw = base64.urlsafe_b64decode(_pad(b64)) 

71 return Ed25519PublicKey.from_public_bytes(raw) 

72 

73 

74def _token_signing_body(token_body: dict[str, Any]) -> bytes: 

75 """JCS-canonical bytes over all token_body fields except 'signature'.""" 

76 body = {k: v for k, v in token_body.items() if k != "signature"} 

77 return canonicaljson.encode_canonical_json(body) 

78 

79 

80def _revocation_signing_body(event: dict[str, Any]) -> bytes: 

81 """JCS-canonical bytes over all event fields except 'signature'.""" 

82 body = {k: v for k, v in event.items() if k != "signature"} 

83 return canonicaljson.encode_canonical_json(body) 

84 

85 

86# --------------------------------------------------------------------------- 

87# Key management 

88# --------------------------------------------------------------------------- 

89 

90 

91def load_node_private_key() -> Ed25519PrivateKey | None: 

92 """Return the cached node Ed25519 private key, re-loading when settings change. 

93 

94 Returns None when STIGMEM_NODE_PRIVATE_KEY is not configured (dev/test mode). 

95 Raises ValueError if the configured value cannot be decoded. 

96 """ 

97 from ..settings import settings # lazy — reflects test patches 

98 

99 current_seed = settings.node_private_key 

100 

101 if hmac.compare_digest(current_seed, _node_private_key_cache.seed): # nosec CT001 — cache invalidation; neither operand is attacker-controlled 

102 return _node_private_key_cache.key # cache hit 

103 

104 if not current_seed: 

105 _node_private_key_cache.key = None 

106 _node_private_key_cache.seed = current_seed 

107 return None 

108 

109 raw = base64.urlsafe_b64decode(_pad(current_seed)) 

110 _node_private_key_cache.key = Ed25519PrivateKey.from_private_bytes(raw) 

111 _node_private_key_cache.seed = current_seed 

112 logger.debug("node private key loaded (seed changed)") 

113 return _node_private_key_cache.key 

114 

115 

116# --------------------------------------------------------------------------- 

117# Signing helpers 

118# --------------------------------------------------------------------------- 

119 

120 

121def sign_token(token_body: dict[str, Any]) -> str: 

122 """Sign *token_body* with the node private key. Returns base64url signature. 

123 

124 Raises RuntimeError if STIGMEM_NODE_PRIVATE_KEY is not configured. 

125 """ 

126 key = load_node_private_key() 

127 if key is None: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 raise RuntimeError( 

129 "STIGMEM_NODE_PRIVATE_KEY is not configured; cannot sign capability token" 

130 ) 

131 sig_bytes = key.sign(_token_signing_body(token_body)) 

132 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

133 

134 

135def sign_revocation_event(event: dict[str, Any]) -> str: 

136 """Sign a revocation event with the node private key. Returns base64url signature. 

137 

138 Raises RuntimeError if STIGMEM_NODE_PRIVATE_KEY is not configured. 

139 """ 

140 key = load_node_private_key() 

141 if key is None: 

142 raise RuntimeError( 

143 "STIGMEM_NODE_PRIVATE_KEY is not configured; cannot sign revocation event" 

144 ) 

145 sig_bytes = key.sign(_revocation_signing_body(event)) 

146 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

147 

148 

149# --------------------------------------------------------------------------- 

150# Verification helpers 

151# --------------------------------------------------------------------------- 

152 

153 

154def _verify_token_signature(token: dict[str, Any], manifest: OrgManifest, sig_b64: str) -> None: 

155 """Verify token signature against the current key or a dual-trust window key. 

156 

157 Tries manifest.public_key first. On failure, walks manifest.rotation_events 

158 in reverse and checks any previous_public_key that is still within the 

159 _DUAL_TRUST_DAYS window (§22.2). Raises CapabilityTokenError if no key 

160 verifies the signature. 

161 """ 

162 signing_body = _token_signing_body(token) 

163 sig_bytes = base64.urlsafe_b64decode(_pad(sig_b64)) 

164 

165 # Try current key 

166 try: 

167 _pubkey_from_b64(manifest.public_key).verify(sig_bytes, signing_body) 

168 return 

169 except InvalidSignature as exc: 

170 current_key_error = exc 

171 except Exception as exc: 

172 raise CapabilityTokenError(f"token signature error: {exc}") from exc 

173 

174 # Dual-trust fallback: try retiring keys whose window has not yet closed 

175 now = datetime.now(UTC) 

176 for evt in reversed(manifest.rotation_events): 

177 if not evt.previous_public_key: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 continue # pre-§22.2 event — no stored retiring pubkey 

179 try: 

180 rotated_at = datetime.fromisoformat(evt.rotated_at.replace("Z", "+00:00")) 

181 except ValueError as exc: 

182 logger.debug( 

183 "skipping rotation event with invalid timestamp %r: %s", 

184 evt.rotated_at, 

185 exc, 

186 ) 

187 continue 

188 if now >= rotated_at + timedelta(days=_DUAL_TRUST_DAYS): 

189 continue # dual-trust window closed 

190 try: 

191 _pubkey_from_b64(evt.previous_public_key).verify(sig_bytes, signing_body) 

192 logger.debug( 

193 "token verified under dual-trust key %s (window open until %s)", 

194 evt.previous_key_id, 

195 (rotated_at + timedelta(days=_DUAL_TRUST_DAYS)).isoformat(), 

196 ) 

197 return 

198 except InvalidSignature as exc: 

199 logger.debug( 

200 "dual-trust key %s did not verify token signature: %s", 

201 evt.previous_key_id, 

202 exc, 

203 ) 

204 continue 

205 except Exception as exc: 

206 raise CapabilityTokenError(f"dual-trust signature error: {exc}") from exc 

207 

208 raise CapabilityTokenError("token signature verification failed") from current_key_error 

209 

210 

211# --------------------------------------------------------------------------- 

212# Verification 

213# --------------------------------------------------------------------------- 

214 

215 

216def verify_token( 

217 token_json: str, 

218 get_manifest: Callable[[str], OrgManifest | None], 

219 *, 

220 trust_mode: str = "relaxed", 

221) -> bool: 

222 """Verify a capability token — spec §19.3.3 steps 1–6. 

223 

224 Args: 

225 token_json: Full token JSON string (including "signature" field). 

226 get_manifest: Callable returning OrgManifest for an entity_uri, or None. 

227 Pass get_peer_manifest from trust_store for production use; 

228 pass a dict lookup lambda for unit tests. 

229 trust_mode: Forwarded to manifest verification. 

230 

231 Returns True on success. 

232 Raises CapabilityTokenError for any failure (including expired, revoked, bad sig). 

233 """ 

234 try: 

235 token = json.loads(token_json) 

236 except json.JSONDecodeError as exc: 

237 raise CapabilityTokenError(f"invalid token JSON: {exc}") from exc 

238 

239 # M-SEC-2: token_version must be present and equal to 1 

240 token_version = token.get("token_version") 

241 if token_version != _TOKEN_VERSION: 

242 raise CapabilityTokenError( 

243 f"unsupported token_version: {token_version!r} (expected {_TOKEN_VERSION})" 

244 ) 

245 

246 issuer: str = token.get("issuer", "") 

247 subject: str = token.get("subject", "") 

248 expiry_str: str = token.get("expiry", "") 

249 token_id: str = token.get("token_id", "") 

250 sig_b64: str = token.get("signature", "") 

251 

252 if not all([issuer, subject, expiry_str, token_id, sig_b64]): 

253 raise CapabilityTokenError("token missing required fields") 

254 

255 # Step 1: resolve issuer manifest (includes expiry check and refresh in get_peer_manifest) 

256 manifest = get_manifest(issuer) 

257 if manifest is None: 

258 raise CapabilityTokenError(f"issuer manifest not found or expired: {issuer!r}") 

259 

260 # Step 2: verify manifest self-signature 

261 try: 

262 verify_manifest(manifest, trust_mode=trust_mode) 

263 except ManifestError as exc: 

264 raise CapabilityTokenError(f"issuer manifest verification failed: {exc}") from exc 

265 

266 # Step 3: verify token signature under manifest public_key or a dual-trust window key. 

267 # §22.2: tokens issued under the prior key MUST be accepted for dual_trust_days after 

268 # rotation. We try the current key first; on InvalidSignature we walk rotation_events 

269 # in reverse and try each previous_public_key that is still within its trust window. 

270 _verify_token_signature(token, manifest, sig_b64) 

271 

272 # Step 4: C1 — subject must be in issuer's entities list 

273 if subject not in manifest.entities: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 raise CapabilityTokenError( 

275 f"subject {subject!r} not in issuer {issuer!r} entities list (C1)" 

276 ) 

277 

278 # Step 5: expiry check 

279 now = datetime.now(UTC) 

280 try: 

281 expiry = datetime.fromisoformat(expiry_str.replace("Z", "+00:00")) 

282 except ValueError as exc: 

283 raise CapabilityTokenError(f"invalid expiry format: {exc}") from exc 

284 

285 if expiry <= now: 

286 raise CapabilityTokenError(f"token expired at {expiry_str}") 

287 

288 # Step 6: revocation check (DB lookup) 

289 from ..db import db 

290 

291 with db() as conn: 

292 row = conn.execute( 

293 "SELECT revoked_at FROM capability_tokens WHERE id = ?", 

294 (token_id,), 

295 ).fetchone() 

296 

297 if row is None: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 raise CapabilityTokenError(f"token {token_id!r} not found in store") 

299 if row["revoked_at"] is not None: 

300 raise CapabilityTokenError(f"token {token_id!r} has been revoked") 

301 

302 return True