Coverage for node / src / stigmem_node / identity / manifest.py: 87%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Org manifest format, signing, and verification — spec §19.1. 

2 

3Public surface: 

4 OrgManifest — dataclass matching §19.1.2 fields 

5 RotationEvent — single step in a key-rotation chain 

6 ManifestError — raised on any structural/cryptographic violation 

7 

8 sign_manifest(manifest, private_key) -> OrgManifest 

9 verify_manifest(manifest, trust_mode) -> bool (raises ManifestError on failure) 

10 verify_rotation_chain(manifest, previous_key_id, previous_pubkey) -> bool 

11 

12Security requirements enforced here: 

13 - JCS via canonicaljson (RFC 8785), NOT json.dumps(sort_keys=True) 

14 - expires_at <= issued_at + 730 days; in strict mode <= 365 days 

15 - Rotation chain validated from the previously-accepted key, all steps 

16 - No key_id reuse (regression/cycle attack prevention) 

17 - rotation_events capped at 100 entries 

18""" 

19 

20from __future__ import annotations 

21 

22import base64 

23from dataclasses import dataclass, field 

24from datetime import datetime, timedelta 

25from typing import Any 

26 

27import canonicaljson 

28from cryptography.exceptions import InvalidSignature 

29from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey 

30 

31_MAX_VALIDITY_DAYS = 730 

32_STRICT_MAX_DAYS = 365 

33_MAX_ROTATION_EVENTS = 100 

34 

35 

36class ManifestError(ValueError): 

37 """Raised when a manifest fails structural or cryptographic validation.""" 

38 

39 

40@dataclass 

41class RotationEvent: 

42 """A single key-rotation step in the org manifest chain. 

43 

44 Each event is signed by the *previous* key, enabling chain verification 

45 without external key registry lookups. 

46 

47 `previous_public_key` stores the retiring key's public bytes (base64url) 

48 so verifiers can check tokens issued under that key during the dual-trust 

49 window (§22.2) without an external key registry. Empty string on events 

50 created before §22.2 support; present on all Phase-12-or-later rotations. 

51 """ 

52 

53 previous_key_id: str 

54 new_key_id: str 

55 new_public_key: str # base64url Ed25519 public key for new_key_id 

56 rotated_at: str # ISO-8601 UTC 

57 signature: str # base64url Ed25519 sig over canonical body (by previous key) 

58 previous_public_key: str = "" # base64url retiring key pubkey (§22.2 dual-trust) 

59 

60 

61@dataclass 

62class OrgManifest: 

63 """Org-identity manifest — spec §19.1.2. 

64 

65 `entities` lists every entity URI this org is authorised to issue 

66 capability tokens for (including itself). The C1 rule enforces that 

67 a token's `subject` must appear in the issuer's `entities` list. 

68 

69 `signature` is the self-signature over the JCS body (all fields except 

70 `signature` itself). It is empty-string before signing. 

71 """ 

72 

73 entity_uri: str 

74 key_id: str 

75 public_key: str # base64url Ed25519 current signing key 

76 issued_at: str # ISO-8601 UTC 

77 expires_at: str # ISO-8601 UTC 

78 entities: list[str] = field(default_factory=list) 

79 rotation_events: list[RotationEvent] = field(default_factory=list) 

80 signature: str = "" # base64url self-signature; empty before signing 

81 

82 

83# --------------------------------------------------------------------------- 

84# Internal helpers 

85# --------------------------------------------------------------------------- 

86 

87 

88def _pad(s: str) -> str: 

89 return s + "=" * (-len(s) % 4) 

90 

91 

92def _pubkey_from_b64(b64: str) -> Ed25519PublicKey: 

93 raw = base64.urlsafe_b64decode(_pad(b64)) 

94 return Ed25519PublicKey.from_public_bytes(raw) 

95 

96 

97def _rotation_event_to_dict(evt: RotationEvent) -> dict[str, Any]: 

98 d: dict[str, Any] = { 

99 "new_key_id": evt.new_key_id, 

100 "new_public_key": evt.new_public_key, 

101 "previous_key_id": evt.previous_key_id, 

102 "rotated_at": evt.rotated_at, 

103 "signature": evt.signature, 

104 } 

105 if evt.previous_public_key: 

106 d["previous_public_key"] = evt.previous_public_key 

107 return d 

108 

109 

110def _manifest_signing_body(manifest: OrgManifest) -> bytes: 

111 """Return JCS-canonical bytes covering all fields except `signature`.""" 

112 doc: dict[str, Any] = { 

113 "entities": manifest.entities, 

114 "entity_uri": manifest.entity_uri, 

115 "expires_at": manifest.expires_at, 

116 "issued_at": manifest.issued_at, 

117 "key_id": manifest.key_id, 

118 "public_key": manifest.public_key, 

119 "rotation_events": [_rotation_event_to_dict(e) for e in manifest.rotation_events], 

120 } 

121 return canonicaljson.encode_canonical_json(doc) 

122 

123 

124def _parse_iso(ts: str) -> datetime: 

125 return datetime.fromisoformat(ts.replace("Z", "+00:00")) 

126 

127 

128def _validate_expiry(manifest: OrgManifest, trust_mode: str) -> None: 

129 issued = _parse_iso(manifest.issued_at) 

130 expires = _parse_iso(manifest.expires_at) 

131 

132 if expires <= issued: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise ManifestError("expires_at must be after issued_at") 

134 

135 if expires > issued + timedelta(days=_MAX_VALIDITY_DAYS): 

136 raise ManifestError( 

137 f"expires_at exceeds maximum of {_MAX_VALIDITY_DAYS} days from issued_at" 

138 ) 

139 

140 if trust_mode == "strict" and expires > issued + timedelta(days=_STRICT_MAX_DAYS): 

141 raise ManifestError( 

142 f"expires_at exceeds {_STRICT_MAX_DAYS}-day limit enforced by trust_mode=strict" 

143 ) 

144 

145 

146# --------------------------------------------------------------------------- 

147# Public API 

148# --------------------------------------------------------------------------- 

149 

150 

151def sign_manifest(manifest: OrgManifest, private_key: Ed25519PrivateKey) -> OrgManifest: 

152 """Sign *manifest* in place; sets and returns manifest.signature.""" 

153 manifest.signature = "" 

154 body = _manifest_signing_body(manifest) 

155 sig_bytes = private_key.sign(body) 

156 manifest.signature = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

157 return manifest 

158 

159 

160def verify_manifest(manifest: OrgManifest, trust_mode: str = "relaxed") -> bool: 

161 """Self-signature check + expiry + rotation-event limit. 

162 

163 Does NOT validate the rotation chain from a previously-accepted key — 

164 call verify_rotation_chain() separately for that. 

165 

166 Returns True on success. Raises ManifestError on any failure. 

167 """ 

168 _validate_expiry(manifest, trust_mode) 

169 

170 if len(manifest.rotation_events) > _MAX_ROTATION_EVENTS: 

171 raise ManifestError( 

172 f"rotation_events has {len(manifest.rotation_events)} entries " 

173 f"(max {_MAX_ROTATION_EVENTS})" 

174 ) 

175 

176 try: 

177 pub = _pubkey_from_b64(manifest.public_key) 

178 body = _manifest_signing_body(manifest) 

179 sig_bytes = base64.urlsafe_b64decode(_pad(manifest.signature)) 

180 pub.verify(sig_bytes, body) 

181 except InvalidSignature as exc: 

182 raise ManifestError("self-signature verification failed") from exc 

183 except Exception as exc: 

184 raise ManifestError(f"self-signature error: {exc}") from exc 

185 

186 return True 

187 

188 

189def verify_rotation_chain( 

190 manifest: OrgManifest, 

191 previous_key_id: str, 

192 previous_pubkey_b64: str, 

193) -> bool: 

194 """Validate ALL rotation steps from the previously-accepted key to current. 

195 

196 §19.1.4 invariants enforced: 

197 1. Chain is contiguous from previous_key_id 

198 2. Each event signature is valid (signed by the preceding key) 

199 3. manifest.key_id matches the terminal new_key_id 

200 4. manifest.public_key matches the terminal new_public_key 

201 5. No key_id reuse (regression / cross-entity replay prevention) 

202 

203 Returns True on success. Raises ManifestError on any violation. 

204 """ 

205 events = manifest.rotation_events 

206 

207 # No rotation: current key must equal previous key 

208 if not events: 

209 if manifest.key_id != previous_key_id: 

210 raise ManifestError( 

211 f"no rotation events but manifest.key_id {manifest.key_id!r} " 

212 f"differs from previous_key_id {previous_key_id!r}" 

213 ) 

214 return True 

215 

216 # Find the starting index — the first event originating from previous_key_id 

217 start_idx: int | None = None 

218 for i, evt in enumerate(events): 218 ↛ 223line 218 didn't jump to line 223 because the loop on line 218 didn't complete

219 if evt.previous_key_id == previous_key_id: 219 ↛ 218line 219 didn't jump to line 218 because the condition on line 219 was always true

220 start_idx = i 

221 break 

222 

223 if start_idx is None: 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true

224 # No event starts from previous_key_id; chain is disconnected 

225 if manifest.key_id == previous_key_id: 

226 return True # key unchanged, no rotation needed 

227 raise ManifestError( 

228 f"rotation chain does not connect previous_key_id {previous_key_id!r} " 

229 f"to manifest.key_id {manifest.key_id!r}" 

230 ) 

231 

232 seen_key_ids: set[str] = {previous_key_id} 

233 current_key_id = previous_key_id 

234 current_pubkey_b64 = previous_pubkey_b64 

235 

236 for i, evt in enumerate(events[start_idx:], start_idx): 

237 # Contiguity 

238 if evt.previous_key_id != current_key_id: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 raise ManifestError( 

240 f"rotation event {i}: expected previous_key_id={current_key_id!r}, " 

241 f"got {evt.previous_key_id!r} (chain gap)" 

242 ) 

243 

244 # No regression / cycle 

245 if evt.new_key_id in seen_key_ids: 

246 raise ManifestError( 

247 f"rotation event {i}: key_id {evt.new_key_id!r} already appears in " 

248 f"the chain (regression or cycle attack)" 

249 ) 

250 

251 # Verify rotation-event signature with the current (previous) key 

252 rotation_body = canonicaljson.encode_canonical_json( 

253 { 

254 "new_key_id": evt.new_key_id, 

255 "new_public_key": evt.new_public_key, 

256 "previous_key_id": evt.previous_key_id, 

257 "rotated_at": evt.rotated_at, 

258 } 

259 ) 

260 try: 

261 pub = _pubkey_from_b64(current_pubkey_b64) 

262 sig_bytes = base64.urlsafe_b64decode(_pad(evt.signature)) 

263 pub.verify(sig_bytes, rotation_body) 

264 except InvalidSignature as exc: 

265 raise ManifestError( 

266 f"rotation event {i}: signature invalid (signed with key {current_key_id!r})" 

267 ) from exc 

268 except Exception as exc: 

269 raise ManifestError(f"rotation event {i}: verification error: {exc}") from exc 

270 

271 seen_key_ids.add(evt.new_key_id) 

272 current_key_id = evt.new_key_id 

273 current_pubkey_b64 = evt.new_public_key 

274 

275 # Terminal invariants 

276 if current_key_id != manifest.key_id: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise ManifestError( 

278 f"rotation chain terminates at {current_key_id!r} " 

279 f"but manifest.key_id is {manifest.key_id!r}" 

280 ) 

281 if current_pubkey_b64 != manifest.public_key: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 raise ManifestError("rotation chain terminal public_key does not match manifest.public_key") 

283 

284 return True 

285 

286 

287def sign_rotation_event( 

288 previous_key_id: str, 

289 new_key_id: str, 

290 new_public_key_b64: str, 

291 rotated_at: str, 

292 private_key: Ed25519PrivateKey, 

293) -> str: 

294 """Sign a rotation event with the *previous* private key. Returns base64url signature.""" 

295 body = canonicaljson.encode_canonical_json( 

296 { 

297 "new_key_id": new_key_id, 

298 "new_public_key": new_public_key_b64, 

299 "previous_key_id": previous_key_id, 

300 "rotated_at": rotated_at, 

301 } 

302 ) 

303 sig_bytes = private_key.sign(body) 

304 return base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

305 

306 

307# --------------------------------------------------------------------------- 

308# Serialisation helpers 

309# --------------------------------------------------------------------------- 

310 

311 

312def manifest_to_dict(manifest: OrgManifest) -> dict[str, Any]: 

313 return { 

314 "entities": manifest.entities, 

315 "entity_uri": manifest.entity_uri, 

316 "expires_at": manifest.expires_at, 

317 "issued_at": manifest.issued_at, 

318 "key_id": manifest.key_id, 

319 "public_key": manifest.public_key, 

320 "rotation_events": [_rotation_event_to_dict(e) for e in manifest.rotation_events], 

321 "signature": manifest.signature, 

322 } 

323 

324 

325def manifest_from_dict(data: dict[str, Any]) -> OrgManifest: 

326 return OrgManifest( 

327 entity_uri=data["entity_uri"], 

328 key_id=data["key_id"], 

329 public_key=data["public_key"], 

330 issued_at=data["issued_at"], 

331 expires_at=data["expires_at"], 

332 entities=data.get("entities", []), 

333 rotation_events=[ 

334 RotationEvent( 

335 previous_key_id=e["previous_key_id"], 

336 new_key_id=e["new_key_id"], 

337 new_public_key=e["new_public_key"], 

338 rotated_at=e["rotated_at"], 

339 signature=e["signature"], 

340 previous_public_key=e.get("previous_public_key", ""), 

341 ) 

342 for e in data.get("rotation_events", []) 

343 ], 

344 signature=data.get("signature", ""), 

345 )