Coverage for node / src / stigmem_node / identity / manifest.py: 87%
122 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Org manifest format, signing, and verification — spec §19.1.
3Public surface:
4 OrgManifest — dataclass matching §19.1.2 fields
5 RotationEvent — single step in a key-rotation chain
6 ManifestError — raised on any structural/cryptographic violation
8 sign_manifest(manifest, private_key) -> OrgManifest
9 verify_manifest(manifest, trust_mode) -> bool (raises ManifestError on failure)
10 verify_rotation_chain(manifest, previous_key_id, previous_pubkey) -> bool
12Security requirements enforced here:
13 - JCS via canonicaljson (RFC 8785), NOT json.dumps(sort_keys=True)
14 - expires_at <= issued_at + 730 days; in strict mode <= 365 days
15 - Rotation chain validated from the previously-accepted key, all steps
16 - No key_id reuse (regression/cycle attack prevention)
17 - rotation_events capped at 100 entries
18"""
20from __future__ import annotations
22import base64
23from dataclasses import dataclass, field
24from datetime import datetime, timedelta
25from typing import Any
27import canonicaljson
28from cryptography.exceptions import InvalidSignature
29from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
31_MAX_VALIDITY_DAYS = 730
32_STRICT_MAX_DAYS = 365
33_MAX_ROTATION_EVENTS = 100
36class ManifestError(ValueError):
37 """Raised when a manifest fails structural or cryptographic validation."""
40@dataclass
41class RotationEvent:
42 """A single key-rotation step in the org manifest chain.
44 Each event is signed by the *previous* key, enabling chain verification
45 without external key registry lookups.
47 `previous_public_key` stores the retiring key's public bytes (base64url)
48 so verifiers can check tokens issued under that key during the dual-trust
49 window (§22.2) without an external key registry. Empty string on events
50 created before §22.2 support; present on all Phase-12-or-later rotations.
51 """
53 previous_key_id: str
54 new_key_id: str
55 new_public_key: str # base64url Ed25519 public key for new_key_id
56 rotated_at: str # ISO-8601 UTC
57 signature: str # base64url Ed25519 sig over canonical body (by previous key)
58 previous_public_key: str = "" # base64url retiring key pubkey (§22.2 dual-trust)
61@dataclass
62class OrgManifest:
63 """Org-identity manifest — spec §19.1.2.
65 `entities` lists every entity URI this org is authorised to issue
66 capability tokens for (including itself). The C1 rule enforces that
67 a token's `subject` must appear in the issuer's `entities` list.
69 `signature` is the self-signature over the JCS body (all fields except
70 `signature` itself). It is empty-string before signing.
71 """
73 entity_uri: str
74 key_id: str
75 public_key: str # base64url Ed25519 current signing key
76 issued_at: str # ISO-8601 UTC
77 expires_at: str # ISO-8601 UTC
78 entities: list[str] = field(default_factory=list)
79 rotation_events: list[RotationEvent] = field(default_factory=list)
80 signature: str = "" # base64url self-signature; empty before signing
83# ---------------------------------------------------------------------------
84# Internal helpers
85# ---------------------------------------------------------------------------
88def _pad(s: str) -> str:
89 return s + "=" * (-len(s) % 4)
92def _pubkey_from_b64(b64: str) -> Ed25519PublicKey:
93 raw = base64.urlsafe_b64decode(_pad(b64))
94 return Ed25519PublicKey.from_public_bytes(raw)
97def _rotation_event_to_dict(evt: RotationEvent) -> dict[str, Any]:
98 d: dict[str, Any] = {
99 "new_key_id": evt.new_key_id,
100 "new_public_key": evt.new_public_key,
101 "previous_key_id": evt.previous_key_id,
102 "rotated_at": evt.rotated_at,
103 "signature": evt.signature,
104 }
105 if evt.previous_public_key:
106 d["previous_public_key"] = evt.previous_public_key
107 return d
110def _manifest_signing_body(manifest: OrgManifest) -> bytes:
111 """Return JCS-canonical bytes covering all fields except `signature`."""
112 doc: dict[str, Any] = {
113 "entities": manifest.entities,
114 "entity_uri": manifest.entity_uri,
115 "expires_at": manifest.expires_at,
116 "issued_at": manifest.issued_at,
117 "key_id": manifest.key_id,
118 "public_key": manifest.public_key,
119 "rotation_events": [_rotation_event_to_dict(e) for e in manifest.rotation_events],
120 }
121 return canonicaljson.encode_canonical_json(doc)
124def _parse_iso(ts: str) -> datetime:
125 return datetime.fromisoformat(ts.replace("Z", "+00:00"))
128def _validate_expiry(manifest: OrgManifest, trust_mode: str) -> None:
129 issued = _parse_iso(manifest.issued_at)
130 expires = _parse_iso(manifest.expires_at)
132 if expires <= issued: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise ManifestError("expires_at must be after issued_at")
135 if expires > issued + timedelta(days=_MAX_VALIDITY_DAYS):
136 raise ManifestError(
137 f"expires_at exceeds maximum of {_MAX_VALIDITY_DAYS} days from issued_at"
138 )
140 if trust_mode == "strict" and expires > issued + timedelta(days=_STRICT_MAX_DAYS):
141 raise ManifestError(
142 f"expires_at exceeds {_STRICT_MAX_DAYS}-day limit enforced by trust_mode=strict"
143 )
146# ---------------------------------------------------------------------------
147# Public API
148# ---------------------------------------------------------------------------
151def sign_manifest(manifest: OrgManifest, private_key: Ed25519PrivateKey) -> OrgManifest:
152 """Sign *manifest* in place; sets and returns manifest.signature."""
153 manifest.signature = ""
154 body = _manifest_signing_body(manifest)
155 sig_bytes = private_key.sign(body)
156 manifest.signature = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
157 return manifest
160def verify_manifest(manifest: OrgManifest, trust_mode: str = "relaxed") -> bool:
161 """Self-signature check + expiry + rotation-event limit.
163 Does NOT validate the rotation chain from a previously-accepted key —
164 call verify_rotation_chain() separately for that.
166 Returns True on success. Raises ManifestError on any failure.
167 """
168 _validate_expiry(manifest, trust_mode)
170 if len(manifest.rotation_events) > _MAX_ROTATION_EVENTS:
171 raise ManifestError(
172 f"rotation_events has {len(manifest.rotation_events)} entries "
173 f"(max {_MAX_ROTATION_EVENTS})"
174 )
176 try:
177 pub = _pubkey_from_b64(manifest.public_key)
178 body = _manifest_signing_body(manifest)
179 sig_bytes = base64.urlsafe_b64decode(_pad(manifest.signature))
180 pub.verify(sig_bytes, body)
181 except InvalidSignature as exc:
182 raise ManifestError("self-signature verification failed") from exc
183 except Exception as exc:
184 raise ManifestError(f"self-signature error: {exc}") from exc
186 return True
189def verify_rotation_chain(
190 manifest: OrgManifest,
191 previous_key_id: str,
192 previous_pubkey_b64: str,
193) -> bool:
194 """Validate ALL rotation steps from the previously-accepted key to current.
196 §19.1.4 invariants enforced:
197 1. Chain is contiguous from previous_key_id
198 2. Each event signature is valid (signed by the preceding key)
199 3. manifest.key_id matches the terminal new_key_id
200 4. manifest.public_key matches the terminal new_public_key
201 5. No key_id reuse (regression / cross-entity replay prevention)
203 Returns True on success. Raises ManifestError on any violation.
204 """
205 events = manifest.rotation_events
207 # No rotation: current key must equal previous key
208 if not events:
209 if manifest.key_id != previous_key_id:
210 raise ManifestError(
211 f"no rotation events but manifest.key_id {manifest.key_id!r} "
212 f"differs from previous_key_id {previous_key_id!r}"
213 )
214 return True
216 # Find the starting index — the first event originating from previous_key_id
217 start_idx: int | None = None
218 for i, evt in enumerate(events): 218 ↛ 223line 218 didn't jump to line 223 because the loop on line 218 didn't complete
219 if evt.previous_key_id == previous_key_id: 219 ↛ 218line 219 didn't jump to line 218 because the condition on line 219 was always true
220 start_idx = i
221 break
223 if start_idx is None: 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true
224 # No event starts from previous_key_id; chain is disconnected
225 if manifest.key_id == previous_key_id:
226 return True # key unchanged, no rotation needed
227 raise ManifestError(
228 f"rotation chain does not connect previous_key_id {previous_key_id!r} "
229 f"to manifest.key_id {manifest.key_id!r}"
230 )
232 seen_key_ids: set[str] = {previous_key_id}
233 current_key_id = previous_key_id
234 current_pubkey_b64 = previous_pubkey_b64
236 for i, evt in enumerate(events[start_idx:], start_idx):
237 # Contiguity
238 if evt.previous_key_id != current_key_id: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 raise ManifestError(
240 f"rotation event {i}: expected previous_key_id={current_key_id!r}, "
241 f"got {evt.previous_key_id!r} (chain gap)"
242 )
244 # No regression / cycle
245 if evt.new_key_id in seen_key_ids:
246 raise ManifestError(
247 f"rotation event {i}: key_id {evt.new_key_id!r} already appears in "
248 f"the chain (regression or cycle attack)"
249 )
251 # Verify rotation-event signature with the current (previous) key
252 rotation_body = canonicaljson.encode_canonical_json(
253 {
254 "new_key_id": evt.new_key_id,
255 "new_public_key": evt.new_public_key,
256 "previous_key_id": evt.previous_key_id,
257 "rotated_at": evt.rotated_at,
258 }
259 )
260 try:
261 pub = _pubkey_from_b64(current_pubkey_b64)
262 sig_bytes = base64.urlsafe_b64decode(_pad(evt.signature))
263 pub.verify(sig_bytes, rotation_body)
264 except InvalidSignature as exc:
265 raise ManifestError(
266 f"rotation event {i}: signature invalid (signed with key {current_key_id!r})"
267 ) from exc
268 except Exception as exc:
269 raise ManifestError(f"rotation event {i}: verification error: {exc}") from exc
271 seen_key_ids.add(evt.new_key_id)
272 current_key_id = evt.new_key_id
273 current_pubkey_b64 = evt.new_public_key
275 # Terminal invariants
276 if current_key_id != manifest.key_id: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise ManifestError(
278 f"rotation chain terminates at {current_key_id!r} "
279 f"but manifest.key_id is {manifest.key_id!r}"
280 )
281 if current_pubkey_b64 != manifest.public_key: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 raise ManifestError("rotation chain terminal public_key does not match manifest.public_key")
284 return True
287def sign_rotation_event(
288 previous_key_id: str,
289 new_key_id: str,
290 new_public_key_b64: str,
291 rotated_at: str,
292 private_key: Ed25519PrivateKey,
293) -> str:
294 """Sign a rotation event with the *previous* private key. Returns base64url signature."""
295 body = canonicaljson.encode_canonical_json(
296 {
297 "new_key_id": new_key_id,
298 "new_public_key": new_public_key_b64,
299 "previous_key_id": previous_key_id,
300 "rotated_at": rotated_at,
301 }
302 )
303 sig_bytes = private_key.sign(body)
304 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
307# ---------------------------------------------------------------------------
308# Serialisation helpers
309# ---------------------------------------------------------------------------
312def manifest_to_dict(manifest: OrgManifest) -> dict[str, Any]:
313 return {
314 "entities": manifest.entities,
315 "entity_uri": manifest.entity_uri,
316 "expires_at": manifest.expires_at,
317 "issued_at": manifest.issued_at,
318 "key_id": manifest.key_id,
319 "public_key": manifest.public_key,
320 "rotation_events": [_rotation_event_to_dict(e) for e in manifest.rotation_events],
321 "signature": manifest.signature,
322 }
325def manifest_from_dict(data: dict[str, Any]) -> OrgManifest:
326 return OrgManifest(
327 entity_uri=data["entity_uri"],
328 key_id=data["key_id"],
329 public_key=data["public_key"],
330 issued_at=data["issued_at"],
331 expires_at=data["expires_at"],
332 entities=data.get("entities", []),
333 rotation_events=[
334 RotationEvent(
335 previous_key_id=e["previous_key_id"],
336 new_key_id=e["new_key_id"],
337 new_public_key=e["new_public_key"],
338 rotated_at=e["rotated_at"],
339 signature=e["signature"],
340 previous_public_key=e.get("previous_public_key", ""),
341 )
342 for e in data.get("rotation_events", [])
343 ],
344 signature=data.get("signature", ""),
345 )