Coverage for node / src / stigmem_node / lifecycle / tombstone_signing.py: 93%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Tombstone signing and verification — spec §23.2.4, rev 14. 

2 

3Signing body: JCS over TombstoneRecord with "signature" and "reason" excluded 

4(field-exclusion pattern per §19.1.3). This allows reason redaction before 

5federation rebroadcast without invalidating the signature. 

6 

7Public surface: 

8 sign_tombstone(record) -> TombstoneRecord 

9 verify_tombstone_signature(record, public_key_b64) -> None (raises on fail) 

10 get_node_key_id() -> str | None 

11""" 

12 

13from __future__ import annotations 

14 

15import base64 

16import logging 

17from typing import Any 

18 

19import canonicaljson 

20from cryptography.exceptions import InvalidSignature 

21from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey 

22 

23from ..models.tombstones import TombstoneRecord, TombstoneRevocationRecord 

24 

25logger = logging.getLogger("stigmem.tombstone") 

26 

27 

28def _pad(b64url: str) -> str: 

29 return b64url + "=" * (-len(b64url) % 4) 

30 

31 

32def _pubkey_from_b64(b64: str) -> Ed25519PublicKey: 

33 raw = base64.urlsafe_b64decode(_pad(b64)) 

34 return Ed25519PublicKey.from_public_bytes(raw) 

35 

36 

37def _signing_body(record: TombstoneRecord) -> bytes: 

38 """JCS-canonical bytes over TombstoneRecord with 'signature' and 'reason' excluded (§23.2.4). 

39 

40 F-12 §23.2.3: scope arrays are sorted lexicographically before canonicalization 

41 to prevent interop failures from array-order differences across JSON implementations. 

42 """ 

43 scope_val: Any = record.scope 

44 if isinstance(scope_val, list): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 scope_val = sorted(scope_val) 

46 doc: dict[str, Any] = { 

47 "id": record.id, 

48 "entity_uri": record.entity_uri, 

49 "scope": scope_val, 

50 "signed_by": record.signed_by, 

51 "key_id": record.key_id, 

52 "created_at": record.created_at, 

53 "legal_hold": record.legal_hold, 

54 } 

55 return canonicaljson.encode_canonical_json(doc) 

56 

57 

58def get_node_key_id() -> str | None: 

59 """Return the node's current signing key_id (SHA-256 hex, 16-char prefix), or None.""" 

60 from ..identity.capability import load_node_private_key 

61 from ..identity.key_rotation import generate_key_id 

62 

63 priv = load_node_private_key() 

64 if priv is None: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 return None 

66 pub = priv.public_key() 

67 return generate_key_id(pub) 

68 

69 

70def sign_tombstone(record: TombstoneRecord) -> TombstoneRecord: 

71 """Sign *record* with the node's active private key. Returns a new record with signature set.""" 

72 from ..identity.capability import load_node_private_key 

73 from ..identity.key_rotation import generate_key_id 

74 

75 priv = load_node_private_key() 

76 if priv is None: 

77 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY not configured; cannot sign tombstones") 

78 

79 pub = priv.public_key() 

80 key_id = generate_key_id(pub) 

81 record = record.model_copy(update={"key_id": key_id}) 

82 body = _signing_body(record) 

83 sig_bytes = priv.sign(body) 

84 sig_b64 = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

85 return record.model_copy(update={"signature": sig_b64}) 

86 

87 

88def verify_tombstone_signature(record: TombstoneRecord, public_key_b64: str) -> None: 

89 """Verify tombstone signature against *public_key_b64* (base64url Ed25519). 

90 

91 Raises ValueError on failure. Per §23.4.2.1 the caller must resolve the 

92 signing key from the org manifest independently of the relaying peer. 

93 """ 

94 try: 

95 pub = _pubkey_from_b64(public_key_b64) 

96 body = _signing_body(record) 

97 sig_bytes = base64.urlsafe_b64decode(_pad(record.signature)) 

98 pub.verify(sig_bytes, body) 

99 except InvalidSignature as exc: 

100 raise ValueError("tombstone signature verification failed") from exc 

101 except Exception as exc: 

102 raise ValueError(f"tombstone signature error: {exc}") from exc 

103 

104 

105def _revocation_signing_body(record: TombstoneRevocationRecord) -> bytes: 

106 """JCS-canonical bytes over TombstoneRevocationRecord with 'signature' and 'reason' excluded.""" 

107 doc: dict[str, Any] = { 

108 "id": record.id, 

109 "tombstone_id": record.tombstone_id, 

110 "signed_by": record.signed_by, 

111 "key_id": record.key_id, 

112 "created_at": record.created_at, 

113 } 

114 return canonicaljson.encode_canonical_json(doc) 

115 

116 

117def sign_revocation(record: TombstoneRevocationRecord) -> TombstoneRevocationRecord: 

118 """Sign *record* with the node's active private key. Returns a new record with signature set.""" 

119 from ..identity.capability import load_node_private_key 

120 from ..identity.key_rotation import generate_key_id 

121 

122 priv = load_node_private_key() 

123 if priv is None: 

124 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY not configured; cannot sign revocations") 

125 

126 pub = priv.public_key() 

127 key_id = generate_key_id(pub) 

128 record = record.model_copy(update={"key_id": key_id}) 

129 body = _revocation_signing_body(record) 

130 sig_bytes = priv.sign(body) 

131 sig_b64 = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

132 return record.model_copy(update={"signature": sig_b64}) 

133 

134 

135def verify_revocation_signature(record: TombstoneRevocationRecord, public_key_b64: str) -> None: 

136 """Verify revocation signature against *public_key_b64* (base64url Ed25519). 

137 

138 Raises ValueError on failure. 

139 """ 

140 try: 

141 pub = _pubkey_from_b64(public_key_b64) 

142 body = _revocation_signing_body(record) 

143 sig_bytes = base64.urlsafe_b64decode(_pad(record.signature)) 

144 pub.verify(sig_bytes, body) 

145 except InvalidSignature as exc: 

146 raise ValueError("revocation signature verification failed") from exc 

147 except Exception as exc: 

148 raise ValueError(f"revocation signature error: {exc}") from exc