Coverage for node / src / stigmem_node / storage / encryption.py: 97%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Key material loading and Argon2id derivation for encryption-at-rest. 

2 

3Key material is cached per unique (passphrase_env, kms_uri) pair so Argon2id 

4runs at most once per process lifetime, not once per connection. 

5 

6Security properties: 

7- Key material is never included in exception messages or log output. 

8- The cache holds derived keys in-process memory; no disk persistence. 

9- Fixed KDF salt is intentional: the passphrase is the secret, not the salt. 

10 This is appropriate for stretching high-entropy operator secrets, not for 

11 hashing low-entropy user passwords. 

12""" 

13 

14from __future__ import annotations 

15 

16import os 

17from typing import TYPE_CHECKING, Any 

18 

19if TYPE_CHECKING: 

20 pass 

21 

22# Domain-separator salt for Argon2id — 32 bytes, never changes after deployment. 

23_ARGON2_SALT: bytes = ( 

24 b"stigmem-at-rest-v1" + b"\x00" * 14 # 18 + 14 = 32 bytes 

25) 

26 

27_key_cache: dict[tuple[str, str], bytes] = {} 

28 

29 

30def load_key(settings: Any) -> bytes: 

31 """Return the 32-byte encryption key derived from *settings*. 

32 

33 Sources are checked in priority order: 

34 1. ``at_rest_key_kms_uri`` (e.g. ``env://MY_HEX_KEY``) 

35 2. ``at_rest_key_passphrase_env`` → Argon2id derivation 

36 

37 Raises ``RuntimeError`` if neither source is configured or loading fails. 

38 Key material is never surfaced in exception messages. 

39 """ 

40 kms_uri: str = getattr(settings, "at_rest_key_kms_uri", "") 

41 passphrase_env: str = getattr(settings, "at_rest_key_passphrase_env", "") 

42 cache_key = (passphrase_env, kms_uri) 

43 

44 if cache_key in _key_cache: 

45 return _key_cache[cache_key] 

46 

47 if kms_uri: 

48 key = _load_from_kms_uri(kms_uri) 

49 elif passphrase_env: 

50 key = _derive_from_passphrase_env(passphrase_env) 

51 else: 

52 raise RuntimeError( 

53 "STIGMEM_AT_REST_ENCRYPTION=on but no key source is configured. " 

54 "Set STIGMEM_AT_REST_KEY_PASSPHRASE_ENV or STIGMEM_AT_REST_KEY_KMS_URI. " 

55 "The node refuses to start without a key source when encryption is enabled." 

56 ) 

57 

58 _key_cache[cache_key] = key 

59 return key 

60 

61 

62def _load_from_kms_uri(uri: str) -> bytes: 

63 """Load a raw 32-byte key from a KMS URI. 

64 

65 Supported schemes: 

66 - ``env://VAR`` — read a 64-char hex-encoded 32-byte key from env var VAR. 

67 """ 

68 if uri.startswith("env://"): 

69 var_name = uri[len("env://") :] 

70 raw = os.environ.get(var_name, "") 

71 if not raw: 

72 raise RuntimeError( 

73 f"STIGMEM_AT_REST_KEY_KMS_URI references env var '{var_name}' " 

74 "which is not set or is empty." 

75 ) 

76 try: 

77 key = bytes.fromhex(raw.strip()) 

78 except ValueError: 

79 raise RuntimeError( 

80 f"Env var '{var_name}' (KMS URI) must contain a 64-character " 

81 "hex-encoded 32-byte key. Check that the value is valid hex." 

82 ) from None 

83 if len(key) != 32: 

84 raise RuntimeError( 

85 f"Env var '{var_name}' (KMS URI) decoded to {len(key)} bytes; " 

86 "exactly 32 bytes (64 hex characters) are required." 

87 ) 

88 return key 

89 

90 raise RuntimeError( 

91 f"Unsupported KMS URI scheme — only 'env://' is supported in this release. Got: {uri!r}" 

92 ) 

93 

94 

95def _derive_from_passphrase_env(env_var: str) -> bytes: 

96 passphrase = os.environ.get(env_var, "") 

97 if not passphrase: 

98 raise RuntimeError( 

99 f"STIGMEM_AT_REST_KEY_PASSPHRASE_ENV references env var '{env_var}' " 

100 "which is not set or is empty." 

101 ) 

102 return derive_key(passphrase.encode()) 

103 

104 

105def derive_key(passphrase: bytes) -> bytes: 

106 """Derive a 32-byte encryption key from *passphrase* using Argon2id. 

107 

108 Parameters follow OWASP 2023 recommendations for key derivation: 

109 time_cost=3, memory_cost=64 MiB, parallelism=4. 

110 

111 Requires ``argon2-cffi``; install with ``pip install 'stigmem-node[encryption]'``. 

112 """ 

113 try: 

114 import argon2.low_level as _argon2 

115 except ImportError as exc: 

116 raise RuntimeError( 

117 "argon2-cffi is required for passphrase-based key derivation. " 

118 "Install it with: pip install 'stigmem-node[encryption]'" 

119 ) from exc 

120 

121 derived: bytes = _argon2.hash_secret_raw( 

122 secret=passphrase, 

123 salt=_ARGON2_SALT, 

124 time_cost=3, 

125 memory_cost=65_536, 

126 parallelism=4, 

127 hash_len=32, 

128 type=_argon2.Type.ID, 

129 ) 

130 return derived 

131 

132 

133def _reset_key_cache() -> None: 

134 """Clear the in-process key cache. For tests only.""" 

135 _key_cache.clear()