Coverage for node / src / stigmem_node / auth.py: 92%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""API-key authentication for the Stigmem reference node. 

2 

3Auth model (spec §3.5): 

4 - Callers present `Authorization: Bearer <raw-key>` on every request. 

5 - Raw keys are never stored; only an Argon2id hash is persisted. 

6 - Legacy SHA-256 hex digests are accepted during the v0.9.x migration 

7 window and are opportunistically rehashed to Argon2id on successful use. 

8 - Each key maps to an entity_uri and a JSON-array of permissions: 

9 ["read"], ["read","write"], or ["read","write","federate"]. 

10 - `STIGMEM_AUTH_REQUIRED` defaults to **True** — every request must 

11 present a valid Bearer token. Set `STIGMEM_AUTH_REQUIRED=false` to 

12 opt into anonymous-mode for single-operator development; this is 

13 NOT appropriate for any deployment that accepts requests from 

14 agents you don't fully control. See LIMITATIONS.md §"LLM agents 

15 holding admin-scope API keys" for context. 

16 

17Bootstrapping the first key (single-operator install): 

18 $ KEY=$(openssl rand -hex 32) 

19 $ stigmem auth bootstrap-key --key "$KEY" 

20 # then use $KEY as `Authorization: Bearer $KEY` for subsequent requests. 

21 

22 The system NEVER generates the key — the caller provides the value and 

23 retains full custody. We hash and store it. This is by design: 

24 removing the system as the credential-generation surface eliminates 

25 the "reveal channel" risk entirely. The command refuses to run when 

26 api_keys is non-empty (bootstrap is one-shot); after bootstrap, 

27 additional keys go through `POST /v1/auth/keys`. 

28""" 

29 

30from __future__ import annotations 

31 

32import hashlib 

33import hmac 

34import json 

35import re 

36import uuid 

37from datetime import UTC, datetime, timedelta 

38from typing import Annotated, Any 

39 

40from argon2 import PasswordHasher 

41from argon2.exceptions import InvalidHash, VerificationError, VerifyMismatchError 

42from argon2.low_level import Type 

43from fastapi import Depends, HTTPException, status 

44from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer 

45 

46from .db import db 

47from .plugins import Deny, TenantContext, get_registry 

48from .settings import settings as settings 

49from .tenant import DEFAULT_TENANT_ID, TenantIdError, validate_tenant_id 

50 

51_ARGON2_HASHER = PasswordHasher( 

52 time_cost=2, 

53 memory_cost=19_456, 

54 parallelism=1, 

55 hash_len=32, 

56 salt_len=16, 

57 type=Type.ID, 

58) 

59_LEGACY_SHA256_HEX = re.compile(r"^[0-9a-f]{64}$") 

60 

61 

62def _hash_key(raw: str) -> str: 

63 """Return the persisted hash format for newly issued API keys.""" 

64 return _ARGON2_HASHER.hash(raw) 

65 

66 

67def _legacy_sha256(raw: str) -> str: 

68 return hashlib.sha256(raw.encode()).hexdigest() 

69 

70 

71def _is_legacy_sha256_hash(stored_hash: str) -> bool: 

72 return bool(_LEGACY_SHA256_HEX.fullmatch(stored_hash)) 

73 

74 

75def _legacy_sha256_allowed() -> bool: 

76 deadline = settings.legacy_sha256_accept_until 

77 if deadline is None: 

78 return True 

79 if deadline.tzinfo is None: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 deadline = deadline.replace(tzinfo=UTC) 

81 return datetime.now(UTC) <= deadline.astimezone(UTC) 

82 

83 

84def _raise_legacy_sha256_disabled() -> None: 

85 raise HTTPException( 

86 status_code=status.HTTP_401_UNAUTHORIZED, 

87 detail="Legacy API key hashes are no longer accepted; rotate the key.", 

88 ) 

89 

90 

91def _verify_key_hash(raw_key: str, stored_hash: str) -> bool: 

92 """Verify *raw_key* against either current Argon2id or legacy SHA-256.""" 

93 if stored_hash.startswith("$argon2id$"): 

94 try: 

95 return _ARGON2_HASHER.verify(stored_hash, raw_key) 

96 except VerifyMismatchError: 

97 return False 

98 except VerificationError: 

99 return False 

100 except InvalidHash: 

101 return False 

102 if _is_legacy_sha256_hash(stored_hash): 102 ↛ 106line 102 didn't jump to line 106 because the condition on line 102 was always true

103 if not _legacy_sha256_allowed(): 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 _raise_legacy_sha256_disabled() 

105 return hmac.compare_digest(stored_hash, _legacy_sha256(raw_key)) 

106 return False 

107 

108 

109def _row_expired(row: Any, now: str) -> bool: 

110 expires_at = row["expires_at"] 

111 return bool(expires_at and expires_at < now) 

112 

113 

114def _parse_api_key_expiry(expires_at: str) -> datetime: 

115 normalized = expires_at.replace("Z", "+00:00") 

116 parsed = datetime.fromisoformat(normalized) 

117 if parsed.tzinfo is None: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 parsed = parsed.replace(tzinfo=UTC) 

119 return parsed.astimezone(UTC) 

120 

121 

122def _normalize_api_key_expiry( 

123 expires_at: str | None, 

124 *, 

125 created_at: datetime, 

126) -> str | None: 

127 """Apply static API-key max-age policy and return the persisted expiry.""" 

128 max_age_days = settings.api_key_max_age_days 

129 if max_age_days <= 0: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 return expires_at 

131 

132 max_expires_at = created_at + timedelta(days=max_age_days) 

133 if expires_at is None: 

134 return max_expires_at.isoformat() 

135 

136 requested_expires_at = _parse_api_key_expiry(expires_at) 

137 if requested_expires_at > max_expires_at: 

138 raise ValueError( 

139 "expires_at exceeds configured static API key max age " 

140 f"({max_age_days} days)" 

141 ) 

142 return requested_expires_at.isoformat() 

143 

144 

145def _rehash_legacy_key(conn: Any, row: Any, raw_key: str) -> None: 

146 """Rewrite one verified legacy SHA-256 key row to Argon2id and audit it.""" 

147 new_hash = _hash_key(raw_key) 

148 conn.execute( 

149 "UPDATE api_keys SET key_hash = ? WHERE id = ? AND key_hash = ?", 

150 (new_hash, row["id"], row["key_hash"]), 

151 ) 

152 

153 from .observability.audit_event import emit 

154 

155 emit( 

156 "api_key_rehashed", 

157 entity_uri=row["entity_uri"], 

158 tenant_id=row["tenant_id"] or "default", 

159 oidc_sub=row["oidc_sub"], 

160 source="system:auth", 

161 detail={ 

162 "key_id": row["id"], 

163 "from": "sha256", 

164 "to": "argon2id", 

165 }, 

166 conn=conn, 

167 ) 

168 

169 

170def _select_key_rows(conn: Any) -> list[Any]: 

171 return list( 

172 conn.execute( 

173 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id" 

174 " FROM api_keys" 

175 ).fetchall() 

176 ) 

177 

178 

179def _find_key_row( 

180 raw_key: str, 

181 *, 

182 include_expired: bool = False, 

183 rehash_legacy: bool = False, 

184) -> Any | None: 

185 now = datetime.now(UTC).isoformat() 

186 with db() as conn: 

187 legacy_row = conn.execute( 

188 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id" 

189 " FROM api_keys WHERE key_hash = ?", 

190 (_legacy_sha256(raw_key),), 

191 ).fetchone() 

192 if legacy_row is not None and (include_expired or not _row_expired(legacy_row, now)): 

193 if not _legacy_sha256_allowed(): 

194 _raise_legacy_sha256_disabled() 

195 if rehash_legacy: 

196 _rehash_legacy_key(conn, legacy_row, raw_key) 

197 legacy_row = conn.execute( 

198 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id" 

199 " FROM api_keys WHERE id = ?", 

200 (legacy_row["id"],), 

201 ).fetchone() 

202 return legacy_row 

203 

204 for row in _select_key_rows(conn): 

205 if not include_expired and _row_expired(row, now): 

206 continue 

207 if _verify_key_hash(raw_key, row["key_hash"]): 

208 if rehash_legacy and _is_legacy_sha256_hash(row["key_hash"]): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 _rehash_legacy_key(conn, row, raw_key) 

210 row = conn.execute( 

211 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub," 

212 " tenant_id FROM api_keys WHERE id = ?", 

213 (row["id"],), 

214 ).fetchone() 

215 return row 

216 return None 

217 

218 

219def find_api_key_id_by_raw_key(raw_key: str) -> str | None: 

220 """Return the stored key id for *raw_key*, including expired rows.""" 

221 row = _find_key_row(raw_key, include_expired=True, rehash_legacy=False) 

222 return row["id"] if row is not None else None 

223 

224 

225def lookup_principal(raw_key: str) -> tuple[str, str, str | None] | None: 

226 """Return (entity_uri, tenant_id, oidc_sub) for a non-expired raw key.""" 

227 row = _find_key_row(raw_key, include_expired=False, rehash_legacy=False) 

228 if row is None: 

229 return None 

230 return (row["entity_uri"], row["tenant_id"] or "default", row["oidc_sub"]) 

231 

232 

233BEARER = HTTPBearer(auto_error=False) 

234 

235 

236class Identity: 

237 """Resolved caller identity (spec §3.5).""" 

238 

239 def __init__( 

240 self, 

241 entity_uri: str, 

242 permissions: list[str], 

243 oidc_sub: str | None = None, 

244 tenant_id: str = "default", 

245 ) -> None: 

246 self.entity_uri = entity_uri 

247 self.permissions = set(permissions) 

248 self.oidc_sub = oidc_sub 

249 self.tenant_id = tenant_id 

250 

251 def can_read(self) -> bool: 

252 return self._has_capability("read") 

253 

254 def can_write(self) -> bool: 

255 return self._has_capability("write") 

256 

257 def can_write_instruction(self) -> bool: 

258 return self._has_capability("instruction:write") 

259 

260 def can_federate(self) -> bool: 

261 return self._has_capability("federate") 

262 

263 def can_admin_federation(self) -> bool: 

264 return self._has_capability("admin:federation") 

265 

266 def can_audit(self) -> bool: 

267 """True when the principal holds the audit.read capability (spec §22.3).""" 

268 return self._has_capability("audit.read") 

269 

270 def is_admin(self) -> bool: 

271 """True when the principal holds the admin capability (spec §24.3.2).""" 

272 return self._has_capability("admin") 

273 

274 def _has_capability(self, capability: str) -> bool: 

275 if capability not in self.permissions: 

276 return False 

277 decision = get_registry().fire_voting( 

278 "capability_check", 

279 identity=self, 

280 capability=capability, 

281 tenant=TenantContext( 

282 tenant_id=self.tenant_id, 

283 metadata={"tenant_context_source": "hook"}, 

284 ), 

285 ) 

286 return not isinstance(decision, Deny) 

287 

288 

289_ANON = Identity("anon:trusted", ["read", "write", "federate"], tenant_id="default") 

290 

291 

292def resolve_identity( 

293 creds: Annotated[HTTPAuthorizationCredentials | None, Depends(BEARER)], 

294) -> Identity: 

295 """Dependency: resolve caller identity from Bearer token. 

296 

297 If auth is disabled, returns a fully-trusted anonymous identity. 

298 If auth is enabled, the token must match a non-expired api_keys row. 

299 """ 

300 if not settings.auth_required: 

301 if creds is not None: 

302 # Still try to resolve a real identity even in non-required mode 

303 identity = _lookup(creds.credentials) 

304 if identity: 

305 return _apply_identity_hooks(identity, raw_credentials=creds.credentials) 

306 return _apply_identity_hooks(_ANON, raw_credentials=None) 

307 

308 if creds is None: 

309 raise HTTPException( 

310 status_code=status.HTTP_401_UNAUTHORIZED, 

311 detail="Authorization header required", 

312 headers={"WWW-Authenticate": "Bearer"}, 

313 ) 

314 identity = _lookup(creds.credentials) 

315 if identity is None: 

316 raise HTTPException( 

317 status_code=status.HTTP_401_UNAUTHORIZED, 

318 detail="Invalid or expired API key", 

319 headers={"WWW-Authenticate": "Bearer"}, 

320 ) 

321 return _apply_identity_hooks(identity, raw_credentials=creds.credentials) 

322 

323 

324def _apply_identity_hooks(identity: Identity, raw_credentials: str | None) -> Identity: 

325 registry = get_registry() 

326 resolved = registry.fire_filter_chain( 

327 "identity_resolve", 

328 identity, 

329 raw_credentials=raw_credentials, 

330 ) 

331 tenant = registry.fire_filter_chain( 

332 "tenant_resolve", 

333 TenantContext( 

334 tenant_id=DEFAULT_TENANT_ID, 

335 metadata={ 

336 "source_tenant_id": resolved.tenant_id, 

337 "tenant_context_source": "hook", 

338 }, 

339 ), 

340 identity=resolved, 

341 ) 

342 try: 

343 resolved_tenant_id = validate_tenant_id(tenant.tenant_id) 

344 except TenantIdError: 

345 resolved_tenant_id = DEFAULT_TENANT_ID 

346 if resolved_tenant_id != resolved.tenant_id: 

347 return Identity( 

348 entity_uri=resolved.entity_uri, 

349 permissions=sorted(resolved.permissions), 

350 oidc_sub=resolved.oidc_sub, 

351 tenant_id=resolved_tenant_id, 

352 ) 

353 return resolved 

354 

355 

356def _lookup(raw_key: str) -> Identity | None: 

357 row = _find_key_row(raw_key, include_expired=False, rehash_legacy=True) 

358 if row is None: 

359 return None 

360 perms: list[str] = json.loads(row["permissions"]) 

361 return Identity( 

362 entity_uri=row["entity_uri"], 

363 permissions=perms, 

364 oidc_sub=row["oidc_sub"], 

365 tenant_id=row["tenant_id"] or "default", 

366 ) 

367 

368 

369def register_api_key( 

370 raw_key: str, 

371 entity_uri: str, 

372 permissions: list[str] | None = None, 

373 description: str | None = None, 

374 expires_at: str | None = None, 

375 oidc_sub: str | None = None, 

376 tenant_id: str = "default", 

377) -> str: 

378 """Persist the hash of a caller-provided raw API key. Returns the key_id. 

379 

380 The caller is responsible for generating `raw_key` (e.g., from 

381 `secrets.token_hex(32)` or `openssl rand -hex 32`) and for storing it 

382 securely. This function never touches the key after hashing. 

383 """ 

384 if permissions is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 permissions = ["read", "write"] 

386 if find_api_key_id_by_raw_key(raw_key) is not None: 

387 raise ValueError("raw API key already exists") 

388 normalized_tenant_id = validate_tenant_id(tenant_id) 

389 key_id = str(uuid.uuid4()) 

390 created_at = datetime.now(UTC).replace(microsecond=0) 

391 normalized_expires_at = _normalize_api_key_expiry( 

392 expires_at, 

393 created_at=created_at, 

394 ) 

395 with db() as conn: 

396 conn.execute( 

397 """INSERT INTO api_keys 

398 (id, key_hash, entity_uri, permissions, description, 

399 created_at, expires_at, oidc_sub, tenant_id) 

400 VALUES (?,?,?,?,?,?,?,?,?)""", 

401 ( 

402 key_id, 

403 _hash_key(raw_key), 

404 entity_uri, 

405 json.dumps(permissions), 

406 description, 

407 created_at.isoformat(), 

408 normalized_expires_at, 

409 oidc_sub, 

410 normalized_tenant_id, 

411 ), 

412 ) 

413 return key_id 

414 

415 

416def create_api_key( 

417 entity_uri: str, 

418 permissions: list[str] | None = None, 

419 description: str | None = None, 

420 expires_at: str | None = None, 

421 oidc_sub: str | None = None, 

422 tenant_id: str = "default", 

423) -> str: 

424 """Mint a new raw API key, persist its hash, and return the raw key. 

425 

426 Internal/test-only convenience over `register_api_key`. Adopter-facing 

427 flows should require the caller to provide the key material (see 

428 `stigmem auth bootstrap-key`) so the system is never the credential 

429 generation surface. 

430 """ 

431 raw = str(uuid.uuid4()).replace("-", "") # 32-char hex 

432 register_api_key( 

433 raw_key=raw, 

434 entity_uri=entity_uri, 

435 permissions=permissions, 

436 description=description, 

437 expires_at=expires_at, 

438 oidc_sub=oidc_sub, 

439 tenant_id=tenant_id, 

440 ) 

441 return raw