Coverage for node / src / stigmem_node / identity / capability.py: 82%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Capability token signing and verification — spec §19.3.2–§19.3.3 (C-SEC-1 / M-SEC-2).
3Public surface:
4 CapabilityTokenError — raised on any token structural or cryptographic violation
5 load_node_private_key() -> Ed25519PrivateKey | None
6 sign_token(token_body) -> str (base64url signature)
7 sign_revocation_event(event) -> str (base64url signature)
8 verify_token(token_json, get_manifest, *, trust_mode) -> bool
10Security requirements:
11 - JCS via canonicaljson (RFC 8785) for both signing and verification
12 - token_version must equal 1; tokens without it are rejected (M-SEC-2)
13 - Signature covers all token fields except "signature" itself
14 - Revocation check via DB (step 6 of §19.3.3)
15 - Private key singleton is seed-keyed to correctly reflect settings changes in tests
16"""
18from __future__ import annotations
20import base64
21import hmac
22import json
23import logging
24from collections.abc import Callable
25from dataclasses import dataclass
26from datetime import UTC, datetime, timedelta
27from typing import Any
29import canonicaljson
30from cryptography.exceptions import InvalidSignature
31from cryptography.hazmat.primitives.asymmetric.ed25519 import (
32 Ed25519PrivateKey,
33 Ed25519PublicKey,
34)
36from .manifest import ManifestError, OrgManifest, verify_manifest
38logger = logging.getLogger("stigmem.identity.capability")
40_TOKEN_VERSION = 1
41# Dual-trust window: old key stays in accept_set for this many days after rotation.
42# Must be ≥ max token TTL (90 days per §19.3.2 / §22.2.2).
43_DUAL_TRUST_DAYS = 90
45@dataclass
46class _NodePrivateKeyCache:
47 """Seed-keyed singleton invalidated automatically when settings change."""
49 key: Ed25519PrivateKey | None = None
50 seed: str = ""
53_node_private_key_cache = _NodePrivateKeyCache()
56class CapabilityTokenError(ValueError):
57 """Raised when a capability token fails any verification step."""
60# ---------------------------------------------------------------------------
61# Internal helpers
62# ---------------------------------------------------------------------------
65def _pad(s: str) -> str:
66 return s + "=" * (-len(s) % 4)
69def _pubkey_from_b64(b64: str) -> Ed25519PublicKey:
70 raw = base64.urlsafe_b64decode(_pad(b64))
71 return Ed25519PublicKey.from_public_bytes(raw)
74def _token_signing_body(token_body: dict[str, Any]) -> bytes:
75 """JCS-canonical bytes over all token_body fields except 'signature'."""
76 body = {k: v for k, v in token_body.items() if k != "signature"}
77 return canonicaljson.encode_canonical_json(body)
80def _revocation_signing_body(event: dict[str, Any]) -> bytes:
81 """JCS-canonical bytes over all event fields except 'signature'."""
82 body = {k: v for k, v in event.items() if k != "signature"}
83 return canonicaljson.encode_canonical_json(body)
86# ---------------------------------------------------------------------------
87# Key management
88# ---------------------------------------------------------------------------
91def load_node_private_key() -> Ed25519PrivateKey | None:
92 """Return the cached node Ed25519 private key, re-loading when settings change.
94 Returns None when STIGMEM_NODE_PRIVATE_KEY is not configured (dev/test mode).
95 Raises ValueError if the configured value cannot be decoded.
96 """
97 from ..settings import settings # lazy — reflects test patches
99 current_seed = settings.node_private_key
101 if hmac.compare_digest(current_seed, _node_private_key_cache.seed): # nosec CT001 — cache invalidation; neither operand is attacker-controlled
102 return _node_private_key_cache.key # cache hit
104 if not current_seed:
105 _node_private_key_cache.key = None
106 _node_private_key_cache.seed = current_seed
107 return None
109 raw = base64.urlsafe_b64decode(_pad(current_seed))
110 _node_private_key_cache.key = Ed25519PrivateKey.from_private_bytes(raw)
111 _node_private_key_cache.seed = current_seed
112 logger.debug("node private key loaded (seed changed)")
113 return _node_private_key_cache.key
116# ---------------------------------------------------------------------------
117# Signing helpers
118# ---------------------------------------------------------------------------
121def sign_token(token_body: dict[str, Any]) -> str:
122 """Sign *token_body* with the node private key. Returns base64url signature.
124 Raises RuntimeError if STIGMEM_NODE_PRIVATE_KEY is not configured.
125 """
126 key = load_node_private_key()
127 if key is None: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 raise RuntimeError(
129 "STIGMEM_NODE_PRIVATE_KEY is not configured; cannot sign capability token"
130 )
131 sig_bytes = key.sign(_token_signing_body(token_body))
132 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
135def sign_revocation_event(event: dict[str, Any]) -> str:
136 """Sign a revocation event with the node private key. Returns base64url signature.
138 Raises RuntimeError if STIGMEM_NODE_PRIVATE_KEY is not configured.
139 """
140 key = load_node_private_key()
141 if key is None:
142 raise RuntimeError(
143 "STIGMEM_NODE_PRIVATE_KEY is not configured; cannot sign revocation event"
144 )
145 sig_bytes = key.sign(_revocation_signing_body(event))
146 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
149# ---------------------------------------------------------------------------
150# Verification helpers
151# ---------------------------------------------------------------------------
154def _verify_token_signature(token: dict[str, Any], manifest: OrgManifest, sig_b64: str) -> None:
155 """Verify token signature against the current key or a dual-trust window key.
157 Tries manifest.public_key first. On failure, walks manifest.rotation_events
158 in reverse and checks any previous_public_key that is still within the
159 _DUAL_TRUST_DAYS window (§22.2). Raises CapabilityTokenError if no key
160 verifies the signature.
161 """
162 signing_body = _token_signing_body(token)
163 sig_bytes = base64.urlsafe_b64decode(_pad(sig_b64))
165 # Try current key
166 try:
167 _pubkey_from_b64(manifest.public_key).verify(sig_bytes, signing_body)
168 return
169 except InvalidSignature as exc:
170 current_key_error = exc
171 except Exception as exc:
172 raise CapabilityTokenError(f"token signature error: {exc}") from exc
174 # Dual-trust fallback: try retiring keys whose window has not yet closed
175 now = datetime.now(UTC)
176 for evt in reversed(manifest.rotation_events):
177 if not evt.previous_public_key: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 continue # pre-§22.2 event — no stored retiring pubkey
179 try:
180 rotated_at = datetime.fromisoformat(evt.rotated_at.replace("Z", "+00:00"))
181 except ValueError as exc:
182 logger.debug(
183 "skipping rotation event with invalid timestamp %r: %s",
184 evt.rotated_at,
185 exc,
186 )
187 continue
188 if now >= rotated_at + timedelta(days=_DUAL_TRUST_DAYS):
189 continue # dual-trust window closed
190 try:
191 _pubkey_from_b64(evt.previous_public_key).verify(sig_bytes, signing_body)
192 logger.debug(
193 "token verified under dual-trust key %s (window open until %s)",
194 evt.previous_key_id,
195 (rotated_at + timedelta(days=_DUAL_TRUST_DAYS)).isoformat(),
196 )
197 return
198 except InvalidSignature as exc:
199 logger.debug(
200 "dual-trust key %s did not verify token signature: %s",
201 evt.previous_key_id,
202 exc,
203 )
204 continue
205 except Exception as exc:
206 raise CapabilityTokenError(f"dual-trust signature error: {exc}") from exc
208 raise CapabilityTokenError("token signature verification failed") from current_key_error
211# ---------------------------------------------------------------------------
212# Verification
213# ---------------------------------------------------------------------------
216def verify_token(
217 token_json: str,
218 get_manifest: Callable[[str], OrgManifest | None],
219 *,
220 trust_mode: str = "relaxed",
221) -> bool:
222 """Verify a capability token — spec §19.3.3 steps 1–6.
224 Args:
225 token_json: Full token JSON string (including "signature" field).
226 get_manifest: Callable returning OrgManifest for an entity_uri, or None.
227 Pass get_peer_manifest from trust_store for production use;
228 pass a dict lookup lambda for unit tests.
229 trust_mode: Forwarded to manifest verification.
231 Returns True on success.
232 Raises CapabilityTokenError for any failure (including expired, revoked, bad sig).
233 """
234 try:
235 token = json.loads(token_json)
236 except json.JSONDecodeError as exc:
237 raise CapabilityTokenError(f"invalid token JSON: {exc}") from exc
239 # M-SEC-2: token_version must be present and equal to 1
240 token_version = token.get("token_version")
241 if token_version != _TOKEN_VERSION:
242 raise CapabilityTokenError(
243 f"unsupported token_version: {token_version!r} (expected {_TOKEN_VERSION})"
244 )
246 issuer: str = token.get("issuer", "")
247 subject: str = token.get("subject", "")
248 expiry_str: str = token.get("expiry", "")
249 token_id: str = token.get("token_id", "")
250 sig_b64: str = token.get("signature", "")
252 if not all([issuer, subject, expiry_str, token_id, sig_b64]):
253 raise CapabilityTokenError("token missing required fields")
255 # Step 1: resolve issuer manifest (includes expiry check and refresh in get_peer_manifest)
256 manifest = get_manifest(issuer)
257 if manifest is None:
258 raise CapabilityTokenError(f"issuer manifest not found or expired: {issuer!r}")
260 # Step 2: verify manifest self-signature
261 try:
262 verify_manifest(manifest, trust_mode=trust_mode)
263 except ManifestError as exc:
264 raise CapabilityTokenError(f"issuer manifest verification failed: {exc}") from exc
266 # Step 3: verify token signature under manifest public_key or a dual-trust window key.
267 # §22.2: tokens issued under the prior key MUST be accepted for dual_trust_days after
268 # rotation. We try the current key first; on InvalidSignature we walk rotation_events
269 # in reverse and try each previous_public_key that is still within its trust window.
270 _verify_token_signature(token, manifest, sig_b64)
272 # Step 4: C1 — subject must be in issuer's entities list
273 if subject not in manifest.entities: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 raise CapabilityTokenError(
275 f"subject {subject!r} not in issuer {issuer!r} entities list (C1)"
276 )
278 # Step 5: expiry check
279 now = datetime.now(UTC)
280 try:
281 expiry = datetime.fromisoformat(expiry_str.replace("Z", "+00:00"))
282 except ValueError as exc:
283 raise CapabilityTokenError(f"invalid expiry format: {exc}") from exc
285 if expiry <= now:
286 raise CapabilityTokenError(f"token expired at {expiry_str}")
288 # Step 6: revocation check (DB lookup)
289 from ..db import db
291 with db() as conn:
292 row = conn.execute(
293 "SELECT revoked_at FROM capability_tokens WHERE id = ?",
294 (token_id,),
295 ).fetchone()
297 if row is None: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 raise CapabilityTokenError(f"token {token_id!r} not found in store")
299 if row["revoked_at"] is not None:
300 raise CapabilityTokenError(f"token {token_id!r} has been revoked")
302 return True