Coverage for node / src / stigmem_node / auth.py: 92%
177 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""API-key authentication for the Stigmem reference node.
3Auth model (spec §3.5):
4 - Callers present `Authorization: Bearer <raw-key>` on every request.
5 - Raw keys are never stored; only an Argon2id hash is persisted.
6 - Legacy SHA-256 hex digests are accepted during the v0.9.x migration
7 window and are opportunistically rehashed to Argon2id on successful use.
8 - Each key maps to an entity_uri and a JSON-array of permissions:
9 ["read"], ["read","write"], or ["read","write","federate"].
10 - `STIGMEM_AUTH_REQUIRED` defaults to **True** — every request must
11 present a valid Bearer token. Set `STIGMEM_AUTH_REQUIRED=false` to
12 opt into anonymous-mode for single-operator development; this is
13 NOT appropriate for any deployment that accepts requests from
14 agents you don't fully control. See LIMITATIONS.md §"LLM agents
15 holding admin-scope API keys" for context.
17Bootstrapping the first key (single-operator install):
18 $ KEY=$(openssl rand -hex 32)
19 $ stigmem auth bootstrap-key --key "$KEY"
20 # then use $KEY as `Authorization: Bearer $KEY` for subsequent requests.
22 The system NEVER generates the key — the caller provides the value and
23 retains full custody. We hash and store it. This is by design:
24 removing the system as the credential-generation surface eliminates
25 the "reveal channel" risk entirely. The command refuses to run when
26 api_keys is non-empty (bootstrap is one-shot); after bootstrap,
27 additional keys go through `POST /v1/auth/keys`.
28"""
30from __future__ import annotations
32import hashlib
33import hmac
34import json
35import re
36import uuid
37from datetime import UTC, datetime, timedelta
38from typing import Annotated, Any
40from argon2 import PasswordHasher
41from argon2.exceptions import InvalidHash, VerificationError, VerifyMismatchError
42from argon2.low_level import Type
43from fastapi import Depends, HTTPException, status
44from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
46from .db import db
47from .plugins import Deny, TenantContext, get_registry
48from .settings import settings as settings
49from .tenant import DEFAULT_TENANT_ID, TenantIdError, validate_tenant_id
51_ARGON2_HASHER = PasswordHasher(
52 time_cost=2,
53 memory_cost=19_456,
54 parallelism=1,
55 hash_len=32,
56 salt_len=16,
57 type=Type.ID,
58)
59_LEGACY_SHA256_HEX = re.compile(r"^[0-9a-f]{64}$")
62def _hash_key(raw: str) -> str:
63 """Return the persisted hash format for newly issued API keys."""
64 return _ARGON2_HASHER.hash(raw)
67def _legacy_sha256(raw: str) -> str:
68 return hashlib.sha256(raw.encode()).hexdigest()
71def _is_legacy_sha256_hash(stored_hash: str) -> bool:
72 return bool(_LEGACY_SHA256_HEX.fullmatch(stored_hash))
75def _legacy_sha256_allowed() -> bool:
76 deadline = settings.legacy_sha256_accept_until
77 if deadline is None:
78 return True
79 if deadline.tzinfo is None: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 deadline = deadline.replace(tzinfo=UTC)
81 return datetime.now(UTC) <= deadline.astimezone(UTC)
84def _raise_legacy_sha256_disabled() -> None:
85 raise HTTPException(
86 status_code=status.HTTP_401_UNAUTHORIZED,
87 detail="Legacy API key hashes are no longer accepted; rotate the key.",
88 )
91def _verify_key_hash(raw_key: str, stored_hash: str) -> bool:
92 """Verify *raw_key* against either current Argon2id or legacy SHA-256."""
93 if stored_hash.startswith("$argon2id$"):
94 try:
95 return _ARGON2_HASHER.verify(stored_hash, raw_key)
96 except VerifyMismatchError:
97 return False
98 except VerificationError:
99 return False
100 except InvalidHash:
101 return False
102 if _is_legacy_sha256_hash(stored_hash): 102 ↛ 106line 102 didn't jump to line 106 because the condition on line 102 was always true
103 if not _legacy_sha256_allowed(): 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 _raise_legacy_sha256_disabled()
105 return hmac.compare_digest(stored_hash, _legacy_sha256(raw_key))
106 return False
109def _row_expired(row: Any, now: str) -> bool:
110 expires_at = row["expires_at"]
111 return bool(expires_at and expires_at < now)
114def _parse_api_key_expiry(expires_at: str) -> datetime:
115 normalized = expires_at.replace("Z", "+00:00")
116 parsed = datetime.fromisoformat(normalized)
117 if parsed.tzinfo is None: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 parsed = parsed.replace(tzinfo=UTC)
119 return parsed.astimezone(UTC)
122def _normalize_api_key_expiry(
123 expires_at: str | None,
124 *,
125 created_at: datetime,
126) -> str | None:
127 """Apply static API-key max-age policy and return the persisted expiry."""
128 max_age_days = settings.api_key_max_age_days
129 if max_age_days <= 0: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 return expires_at
132 max_expires_at = created_at + timedelta(days=max_age_days)
133 if expires_at is None:
134 return max_expires_at.isoformat()
136 requested_expires_at = _parse_api_key_expiry(expires_at)
137 if requested_expires_at > max_expires_at:
138 raise ValueError(
139 "expires_at exceeds configured static API key max age "
140 f"({max_age_days} days)"
141 )
142 return requested_expires_at.isoformat()
145def _rehash_legacy_key(conn: Any, row: Any, raw_key: str) -> None:
146 """Rewrite one verified legacy SHA-256 key row to Argon2id and audit it."""
147 new_hash = _hash_key(raw_key)
148 conn.execute(
149 "UPDATE api_keys SET key_hash = ? WHERE id = ? AND key_hash = ?",
150 (new_hash, row["id"], row["key_hash"]),
151 )
153 from .observability.audit_event import emit
155 emit(
156 "api_key_rehashed",
157 entity_uri=row["entity_uri"],
158 tenant_id=row["tenant_id"] or "default",
159 oidc_sub=row["oidc_sub"],
160 source="system:auth",
161 detail={
162 "key_id": row["id"],
163 "from": "sha256",
164 "to": "argon2id",
165 },
166 conn=conn,
167 )
170def _select_key_rows(conn: Any) -> list[Any]:
171 return list(
172 conn.execute(
173 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id"
174 " FROM api_keys"
175 ).fetchall()
176 )
179def _find_key_row(
180 raw_key: str,
181 *,
182 include_expired: bool = False,
183 rehash_legacy: bool = False,
184) -> Any | None:
185 now = datetime.now(UTC).isoformat()
186 with db() as conn:
187 legacy_row = conn.execute(
188 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id"
189 " FROM api_keys WHERE key_hash = ?",
190 (_legacy_sha256(raw_key),),
191 ).fetchone()
192 if legacy_row is not None and (include_expired or not _row_expired(legacy_row, now)):
193 if not _legacy_sha256_allowed():
194 _raise_legacy_sha256_disabled()
195 if rehash_legacy:
196 _rehash_legacy_key(conn, legacy_row, raw_key)
197 legacy_row = conn.execute(
198 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub, tenant_id"
199 " FROM api_keys WHERE id = ?",
200 (legacy_row["id"],),
201 ).fetchone()
202 return legacy_row
204 for row in _select_key_rows(conn):
205 if not include_expired and _row_expired(row, now):
206 continue
207 if _verify_key_hash(raw_key, row["key_hash"]):
208 if rehash_legacy and _is_legacy_sha256_hash(row["key_hash"]): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 _rehash_legacy_key(conn, row, raw_key)
210 row = conn.execute(
211 "SELECT id, key_hash, entity_uri, permissions, expires_at, oidc_sub,"
212 " tenant_id FROM api_keys WHERE id = ?",
213 (row["id"],),
214 ).fetchone()
215 return row
216 return None
219def find_api_key_id_by_raw_key(raw_key: str) -> str | None:
220 """Return the stored key id for *raw_key*, including expired rows."""
221 row = _find_key_row(raw_key, include_expired=True, rehash_legacy=False)
222 return row["id"] if row is not None else None
225def lookup_principal(raw_key: str) -> tuple[str, str, str | None] | None:
226 """Return (entity_uri, tenant_id, oidc_sub) for a non-expired raw key."""
227 row = _find_key_row(raw_key, include_expired=False, rehash_legacy=False)
228 if row is None:
229 return None
230 return (row["entity_uri"], row["tenant_id"] or "default", row["oidc_sub"])
233BEARER = HTTPBearer(auto_error=False)
236class Identity:
237 """Resolved caller identity (spec §3.5)."""
239 def __init__(
240 self,
241 entity_uri: str,
242 permissions: list[str],
243 oidc_sub: str | None = None,
244 tenant_id: str = "default",
245 ) -> None:
246 self.entity_uri = entity_uri
247 self.permissions = set(permissions)
248 self.oidc_sub = oidc_sub
249 self.tenant_id = tenant_id
251 def can_read(self) -> bool:
252 return self._has_capability("read")
254 def can_write(self) -> bool:
255 return self._has_capability("write")
257 def can_write_instruction(self) -> bool:
258 return self._has_capability("instruction:write")
260 def can_federate(self) -> bool:
261 return self._has_capability("federate")
263 def can_admin_federation(self) -> bool:
264 return self._has_capability("admin:federation")
266 def can_audit(self) -> bool:
267 """True when the principal holds the audit.read capability (spec §22.3)."""
268 return self._has_capability("audit.read")
270 def is_admin(self) -> bool:
271 """True when the principal holds the admin capability (spec §24.3.2)."""
272 return self._has_capability("admin")
274 def _has_capability(self, capability: str) -> bool:
275 if capability not in self.permissions:
276 return False
277 decision = get_registry().fire_voting(
278 "capability_check",
279 identity=self,
280 capability=capability,
281 tenant=TenantContext(
282 tenant_id=self.tenant_id,
283 metadata={"tenant_context_source": "hook"},
284 ),
285 )
286 return not isinstance(decision, Deny)
289_ANON = Identity("anon:trusted", ["read", "write", "federate"], tenant_id="default")
292def resolve_identity(
293 creds: Annotated[HTTPAuthorizationCredentials | None, Depends(BEARER)],
294) -> Identity:
295 """Dependency: resolve caller identity from Bearer token.
297 If auth is disabled, returns a fully-trusted anonymous identity.
298 If auth is enabled, the token must match a non-expired api_keys row.
299 """
300 if not settings.auth_required:
301 if creds is not None:
302 # Still try to resolve a real identity even in non-required mode
303 identity = _lookup(creds.credentials)
304 if identity:
305 return _apply_identity_hooks(identity, raw_credentials=creds.credentials)
306 return _apply_identity_hooks(_ANON, raw_credentials=None)
308 if creds is None:
309 raise HTTPException(
310 status_code=status.HTTP_401_UNAUTHORIZED,
311 detail="Authorization header required",
312 headers={"WWW-Authenticate": "Bearer"},
313 )
314 identity = _lookup(creds.credentials)
315 if identity is None:
316 raise HTTPException(
317 status_code=status.HTTP_401_UNAUTHORIZED,
318 detail="Invalid or expired API key",
319 headers={"WWW-Authenticate": "Bearer"},
320 )
321 return _apply_identity_hooks(identity, raw_credentials=creds.credentials)
324def _apply_identity_hooks(identity: Identity, raw_credentials: str | None) -> Identity:
325 registry = get_registry()
326 resolved = registry.fire_filter_chain(
327 "identity_resolve",
328 identity,
329 raw_credentials=raw_credentials,
330 )
331 tenant = registry.fire_filter_chain(
332 "tenant_resolve",
333 TenantContext(
334 tenant_id=DEFAULT_TENANT_ID,
335 metadata={
336 "source_tenant_id": resolved.tenant_id,
337 "tenant_context_source": "hook",
338 },
339 ),
340 identity=resolved,
341 )
342 try:
343 resolved_tenant_id = validate_tenant_id(tenant.tenant_id)
344 except TenantIdError:
345 resolved_tenant_id = DEFAULT_TENANT_ID
346 if resolved_tenant_id != resolved.tenant_id:
347 return Identity(
348 entity_uri=resolved.entity_uri,
349 permissions=sorted(resolved.permissions),
350 oidc_sub=resolved.oidc_sub,
351 tenant_id=resolved_tenant_id,
352 )
353 return resolved
356def _lookup(raw_key: str) -> Identity | None:
357 row = _find_key_row(raw_key, include_expired=False, rehash_legacy=True)
358 if row is None:
359 return None
360 perms: list[str] = json.loads(row["permissions"])
361 return Identity(
362 entity_uri=row["entity_uri"],
363 permissions=perms,
364 oidc_sub=row["oidc_sub"],
365 tenant_id=row["tenant_id"] or "default",
366 )
369def register_api_key(
370 raw_key: str,
371 entity_uri: str,
372 permissions: list[str] | None = None,
373 description: str | None = None,
374 expires_at: str | None = None,
375 oidc_sub: str | None = None,
376 tenant_id: str = "default",
377) -> str:
378 """Persist the hash of a caller-provided raw API key. Returns the key_id.
380 The caller is responsible for generating `raw_key` (e.g., from
381 `secrets.token_hex(32)` or `openssl rand -hex 32`) and for storing it
382 securely. This function never touches the key after hashing.
383 """
384 if permissions is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 permissions = ["read", "write"]
386 if find_api_key_id_by_raw_key(raw_key) is not None:
387 raise ValueError("raw API key already exists")
388 normalized_tenant_id = validate_tenant_id(tenant_id)
389 key_id = str(uuid.uuid4())
390 created_at = datetime.now(UTC).replace(microsecond=0)
391 normalized_expires_at = _normalize_api_key_expiry(
392 expires_at,
393 created_at=created_at,
394 )
395 with db() as conn:
396 conn.execute(
397 """INSERT INTO api_keys
398 (id, key_hash, entity_uri, permissions, description,
399 created_at, expires_at, oidc_sub, tenant_id)
400 VALUES (?,?,?,?,?,?,?,?,?)""",
401 (
402 key_id,
403 _hash_key(raw_key),
404 entity_uri,
405 json.dumps(permissions),
406 description,
407 created_at.isoformat(),
408 normalized_expires_at,
409 oidc_sub,
410 normalized_tenant_id,
411 ),
412 )
413 return key_id
416def create_api_key(
417 entity_uri: str,
418 permissions: list[str] | None = None,
419 description: str | None = None,
420 expires_at: str | None = None,
421 oidc_sub: str | None = None,
422 tenant_id: str = "default",
423) -> str:
424 """Mint a new raw API key, persist its hash, and return the raw key.
426 Internal/test-only convenience over `register_api_key`. Adopter-facing
427 flows should require the caller to provide the key material (see
428 `stigmem auth bootstrap-key`) so the system is never the credential
429 generation surface.
430 """
431 raw = str(uuid.uuid4()).replace("-", "") # 32-char hex
432 register_api_key(
433 raw_key=raw,
434 entity_uri=entity_uri,
435 permissions=permissions,
436 description=description,
437 expires_at=expires_at,
438 oidc_sub=oidc_sub,
439 tenant_id=tenant_id,
440 )
441 return raw