Coverage for node / src / stigmem_node / storage / encryption.py: 97%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Key material loading and Argon2id derivation for encryption-at-rest.
3Key material is cached per unique (passphrase_env, kms_uri) pair so Argon2id
4runs at most once per process lifetime, not once per connection.
6Security properties:
7- Key material is never included in exception messages or log output.
8- The cache holds derived keys in-process memory; no disk persistence.
9- Fixed KDF salt is intentional: the passphrase is the secret, not the salt.
10 This is appropriate for stretching high-entropy operator secrets, not for
11 hashing low-entropy user passwords.
12"""
14from __future__ import annotations
16import os
17from typing import TYPE_CHECKING, Any
19if TYPE_CHECKING:
20 pass
22# Domain-separator salt for Argon2id — 32 bytes, never changes after deployment.
23_ARGON2_SALT: bytes = (
24 b"stigmem-at-rest-v1" + b"\x00" * 14 # 18 + 14 = 32 bytes
25)
27_key_cache: dict[tuple[str, str], bytes] = {}
30def load_key(settings: Any) -> bytes:
31 """Return the 32-byte encryption key derived from *settings*.
33 Sources are checked in priority order:
34 1. ``at_rest_key_kms_uri`` (e.g. ``env://MY_HEX_KEY``)
35 2. ``at_rest_key_passphrase_env`` → Argon2id derivation
37 Raises ``RuntimeError`` if neither source is configured or loading fails.
38 Key material is never surfaced in exception messages.
39 """
40 kms_uri: str = getattr(settings, "at_rest_key_kms_uri", "")
41 passphrase_env: str = getattr(settings, "at_rest_key_passphrase_env", "")
42 cache_key = (passphrase_env, kms_uri)
44 if cache_key in _key_cache:
45 return _key_cache[cache_key]
47 if kms_uri:
48 key = _load_from_kms_uri(kms_uri)
49 elif passphrase_env:
50 key = _derive_from_passphrase_env(passphrase_env)
51 else:
52 raise RuntimeError(
53 "STIGMEM_AT_REST_ENCRYPTION=on but no key source is configured. "
54 "Set STIGMEM_AT_REST_KEY_PASSPHRASE_ENV or STIGMEM_AT_REST_KEY_KMS_URI. "
55 "The node refuses to start without a key source when encryption is enabled."
56 )
58 _key_cache[cache_key] = key
59 return key
62def _load_from_kms_uri(uri: str) -> bytes:
63 """Load a raw 32-byte key from a KMS URI.
65 Supported schemes:
66 - ``env://VAR`` — read a 64-char hex-encoded 32-byte key from env var VAR.
67 """
68 if uri.startswith("env://"):
69 var_name = uri[len("env://") :]
70 raw = os.environ.get(var_name, "")
71 if not raw:
72 raise RuntimeError(
73 f"STIGMEM_AT_REST_KEY_KMS_URI references env var '{var_name}' "
74 "which is not set or is empty."
75 )
76 try:
77 key = bytes.fromhex(raw.strip())
78 except ValueError:
79 raise RuntimeError(
80 f"Env var '{var_name}' (KMS URI) must contain a 64-character "
81 "hex-encoded 32-byte key. Check that the value is valid hex."
82 ) from None
83 if len(key) != 32:
84 raise RuntimeError(
85 f"Env var '{var_name}' (KMS URI) decoded to {len(key)} bytes; "
86 "exactly 32 bytes (64 hex characters) are required."
87 )
88 return key
90 raise RuntimeError(
91 f"Unsupported KMS URI scheme — only 'env://' is supported in this release. Got: {uri!r}"
92 )
95def _derive_from_passphrase_env(env_var: str) -> bytes:
96 passphrase = os.environ.get(env_var, "")
97 if not passphrase:
98 raise RuntimeError(
99 f"STIGMEM_AT_REST_KEY_PASSPHRASE_ENV references env var '{env_var}' "
100 "which is not set or is empty."
101 )
102 return derive_key(passphrase.encode())
105def derive_key(passphrase: bytes) -> bytes:
106 """Derive a 32-byte encryption key from *passphrase* using Argon2id.
108 Parameters follow OWASP 2023 recommendations for key derivation:
109 time_cost=3, memory_cost=64 MiB, parallelism=4.
111 Requires ``argon2-cffi``; install with ``pip install 'stigmem-node[encryption]'``.
112 """
113 try:
114 import argon2.low_level as _argon2
115 except ImportError as exc:
116 raise RuntimeError(
117 "argon2-cffi is required for passphrase-based key derivation. "
118 "Install it with: pip install 'stigmem-node[encryption]'"
119 ) from exc
121 derived: bytes = _argon2.hash_secret_raw(
122 secret=passphrase,
123 salt=_ARGON2_SALT,
124 time_cost=3,
125 memory_cost=65_536,
126 parallelism=4,
127 hash_len=32,
128 type=_argon2.Type.ID,
129 )
130 return derived
133def _reset_key_cache() -> None:
134 """Clear the in-process key cache. For tests only."""
135 _key_cache.clear()