Coverage for node / src / stigmem_node / identity / trust_store.py: 81%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Peer trust store — reads/writes federation_manifests table (spec §19.8). 

2 

3Public surface: 

4 store_peer_manifest(entity_uri, manifest, log_entry) -> None 

5 get_peer_manifest(entity_uri) -> OrgManifest | None 

6 refresh_peer_manifests() -> None (periodic task) 

7 cleanup_expired_tokens() -> int (background cleanup, run opportunistically) 

8 

9Security requirements: 

10 H1 mitigation: when resolving a peer manifest for token verification, 

11 check manifest.expires_at > now; attempt refresh; reject if still expired. 

12 Rotation chain invariant 4: reject any update that would regress key_id. 

13""" 

14 

15from __future__ import annotations 

16 

17import json 

18import logging 

19import uuid 

20from datetime import UTC, datetime 

21from typing import TYPE_CHECKING 

22 

23import httpx 

24 

25from ..net_util import assert_safe_url 

26from .manifest import ( 

27 ManifestError, 

28 OrgManifest, 

29 manifest_from_dict, 

30 manifest_to_dict, 

31 verify_manifest, 

32 verify_rotation_chain, 

33) 

34from .transparency_log import LogEntry 

35 

36if TYPE_CHECKING: 

37 pass 

38 

39logger = logging.getLogger("stigmem.identity.trust_store") 

40 

41 

42def store_peer_manifest( 

43 entity_uri: str, 

44 manifest: OrgManifest, 

45 log_entry: LogEntry | None = None, 

46 *, 

47 trust_mode: str = "relaxed", 

48) -> None: 

49 """Upsert a peer manifest with rotation-chain regression check. 

50 

51 Raises ManifestError if the update would regress to a previously-used key 

52 (§19.1.4 invariant 4) or if the manifest fails self-verification. 

53 """ 

54 from ..db import db 

55 

56 # Verify the manifest before storing 

57 verify_manifest(manifest, trust_mode=trust_mode) 

58 

59 now = datetime.now(UTC).isoformat() 

60 log_entry_json = ( 

61 json.dumps( 

62 { 

63 "log_id": log_entry.log_id, 

64 "leaf_hash": log_entry.leaf_hash, 

65 "log_index": log_entry.log_index, 

66 "integrated_time": log_entry.integrated_time, 

67 "inclusion_proof": log_entry.inclusion_proof, 

68 } 

69 ) 

70 if log_entry is not None 

71 else None 

72 ) 

73 

74 manifest_json = json.dumps(manifest_to_dict(manifest), separators=(",", ":")) 

75 

76 with db() as conn: 

77 existing = conn.execute( 

78 "SELECT id, manifest_json, key_id FROM federation_manifests WHERE entity_uri = ?", 

79 (entity_uri,), 

80 ).fetchone() 

81 

82 if existing is not None: 

83 # Rotation chain regression check: verify that the new manifest's chain 

84 # connects to (or continues from) the previously-accepted key. 

85 prev_manifest = manifest_from_dict(json.loads(existing["manifest_json"])) 

86 if manifest.key_id != prev_manifest.key_id: 

87 try: 

88 verify_rotation_chain( 

89 manifest, 

90 previous_key_id=prev_manifest.key_id, 

91 previous_pubkey_b64=prev_manifest.public_key, 

92 ) 

93 except ManifestError as exc: 

94 raise ManifestError(f"manifest update rejected: {exc}") from exc 

95 

96 conn.execute( 

97 """UPDATE federation_manifests 

98 SET manifest_json = ?, 

99 signature = ?, 

100 key_id = ?, 

101 issued_at = ?, 

102 expires_at = ?, 

103 log_entry_json = ?, 

104 updated_at = ? 

105 WHERE entity_uri = ?""", 

106 ( 

107 manifest_json, 

108 manifest.signature, 

109 manifest.key_id, 

110 manifest.issued_at, 

111 manifest.expires_at, 

112 log_entry_json, 

113 now, 

114 entity_uri, 

115 ), 

116 ) 

117 else: 

118 conn.execute( 

119 """INSERT INTO federation_manifests 

120 (id, entity_uri, manifest_json, signature, key_id, 

121 issued_at, expires_at, log_entry_json, created_at, updated_at) 

122 VALUES (?,?,?,?,?,?,?,?,?,?)""", 

123 ( 

124 str(uuid.uuid4()), 

125 entity_uri, 

126 manifest_json, 

127 manifest.signature, 

128 manifest.key_id, 

129 manifest.issued_at, 

130 manifest.expires_at, 

131 log_entry_json, 

132 now, 

133 now, 

134 ), 

135 ) 

136 

137 logger.info("stored manifest for %s (key_id=%s)", entity_uri, manifest.key_id) 

138 

139 

140def get_peer_manifest( 

141 entity_uri: str, 

142 *, 

143 refresh_if_expired: bool = True, 

144 trust_mode: str = "relaxed", 

145) -> OrgManifest | None: 

146 """Return the stored manifest for *entity_uri*, or None if unknown. 

147 

148 H1 mitigation: if the stored manifest is expired and refresh_if_expired is True, 

149 we attempt an HTTP fetch from /.well-known/stigmem-manifest.json. 

150 Returns None (reject) if the manifest is expired and cannot be refreshed. 

151 """ 

152 from ..db import db 

153 

154 with db() as conn: 

155 row = conn.execute( 

156 "SELECT manifest_json, log_entry_json, expires_at FROM federation_manifests " 

157 "WHERE entity_uri = ?", 

158 (entity_uri,), 

159 ).fetchone() 

160 

161 if row is None: 

162 return None 

163 

164 manifest = manifest_from_dict(json.loads(row["manifest_json"])) 

165 

166 # H1: expiry check 

167 now = datetime.now(UTC) 

168 expires_at = datetime.fromisoformat(row["expires_at"].replace("Z", "+00:00")) 

169 

170 if expires_at <= now: 

171 if not refresh_if_expired: 

172 logger.warning("manifest for %s is expired; rejecting", entity_uri) 

173 return None # caller must treat as rejection 

174 # Attempt refresh 

175 refreshed = _try_fetch_manifest(entity_uri) 

176 if refreshed is None: 176 ↛ 179line 176 didn't jump to line 179 because the condition on line 176 was always true

177 logger.warning("manifest for %s expired; refresh failed; rejecting", entity_uri) 

178 return None 

179 try: 

180 store_peer_manifest(entity_uri, refreshed, trust_mode=trust_mode) 

181 except ManifestError as exc: 

182 logger.warning("refreshed manifest for %s failed validation: %s", entity_uri, exc) 

183 return None 

184 return refreshed 

185 

186 return manifest 

187 

188 

189def refresh_peer_manifests() -> None: 

190 """Periodic task: refresh all active peer manifests from their well-known endpoints. 

191 

192 Alerts (logs warnings) on rotation events. 

193 Also runs opportunistic cleanup of expired capability tokens. 

194 """ 

195 from ..db import db 

196 

197 with db() as conn: 

198 rows = conn.execute("SELECT entity_uri, manifest_json FROM federation_manifests").fetchall() 

199 

200 for row in rows: 

201 entity_uri: str = row["entity_uri"] 

202 prev_manifest = manifest_from_dict(json.loads(row["manifest_json"])) 

203 refreshed = _try_fetch_manifest(entity_uri) 

204 if refreshed is None: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was always true

205 continue 

206 if refreshed.key_id != prev_manifest.key_id: 

207 logger.warning( 

208 "key rotation detected for %s: %s -> %s", 

209 entity_uri, 

210 prev_manifest.key_id, 

211 refreshed.key_id, 

212 ) 

213 try: 

214 store_peer_manifest(entity_uri, refreshed) 

215 except ManifestError as exc: 

216 logger.warning("skipping refresh for %s: %s", entity_uri, exc) 

217 

218 cleanup_expired_tokens() 

219 

220 

221def _try_fetch_manifest(entity_uri: str) -> OrgManifest | None: 

222 """Fetch /.well-known/stigmem-manifest.json from the peer's origin.""" 

223 # Derive base URL from entity_uri (strip scheme-specific parts if needed) 

224 # entity_uri is expected to be an https:// URI or stigmem:// URI 

225 if entity_uri.startswith("https://") or entity_uri.startswith("http://"): 

226 from urllib.parse import urlparse 

227 

228 parsed = urlparse(entity_uri) 

229 base_url = f"{parsed.scheme}://{parsed.netloc}" 

230 else: 

231 return None # can't derive URL from non-HTTP URI 

232 

233 try: 

234 assert_safe_url(base_url, allow_schemes=frozenset({"https", "http"})) 

235 resp = httpx.get( 

236 f"{base_url}/.well-known/stigmem-manifest.json", 

237 timeout=10.0, 

238 follow_redirects=False, 

239 ) 

240 if resp.status_code != 200: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was always true

241 return None 

242 data = resp.json() 

243 manifest = manifest_from_dict(data) 

244 verify_manifest(manifest) 

245 return manifest 

246 except Exception as exc: 

247 logger.debug("failed to fetch manifest for %s: %s", entity_uri, exc) 

248 return None 

249 

250 

251def cleanup_expired_tokens() -> int: 

252 """Delete capability tokens expired more than 24 hours ago. Returns count deleted.""" 

253 from datetime import timedelta 

254 

255 from ..db import db 

256 

257 cutoff = (datetime.now(UTC) - timedelta(hours=24)).isoformat() 

258 with db() as conn: 

259 cur = conn.execute( 

260 "DELETE FROM capability_tokens WHERE expiry < ?", 

261 (cutoff,), 

262 ) 

263 deleted: int = cur.rowcount or 0 

264 if deleted: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 logger.info("cleaned up %d expired capability tokens", deleted) 

266 return deleted