Coverage for node / src / stigmem_node / lifecycle / tombstone_signing.py: 93%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Tombstone signing and verification — spec §23.2.4, rev 14.
3Signing body: JCS over TombstoneRecord with "signature" and "reason" excluded
4(field-exclusion pattern per §19.1.3). This allows reason redaction before
5federation rebroadcast without invalidating the signature.
7Public surface:
8 sign_tombstone(record) -> TombstoneRecord
9 verify_tombstone_signature(record, public_key_b64) -> None (raises on fail)
10 get_node_key_id() -> str | None
11"""
13from __future__ import annotations
15import base64
16import logging
17from typing import Any
19import canonicaljson
20from cryptography.exceptions import InvalidSignature
21from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
23from ..models.tombstones import TombstoneRecord, TombstoneRevocationRecord
25logger = logging.getLogger("stigmem.tombstone")
28def _pad(b64url: str) -> str:
29 return b64url + "=" * (-len(b64url) % 4)
32def _pubkey_from_b64(b64: str) -> Ed25519PublicKey:
33 raw = base64.urlsafe_b64decode(_pad(b64))
34 return Ed25519PublicKey.from_public_bytes(raw)
37def _signing_body(record: TombstoneRecord) -> bytes:
38 """JCS-canonical bytes over TombstoneRecord with 'signature' and 'reason' excluded (§23.2.4).
40 F-12 §23.2.3: scope arrays are sorted lexicographically before canonicalization
41 to prevent interop failures from array-order differences across JSON implementations.
42 """
43 scope_val: Any = record.scope
44 if isinstance(scope_val, list): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 scope_val = sorted(scope_val)
46 doc: dict[str, Any] = {
47 "id": record.id,
48 "entity_uri": record.entity_uri,
49 "scope": scope_val,
50 "signed_by": record.signed_by,
51 "key_id": record.key_id,
52 "created_at": record.created_at,
53 "legal_hold": record.legal_hold,
54 }
55 return canonicaljson.encode_canonical_json(doc)
58def get_node_key_id() -> str | None:
59 """Return the node's current signing key_id (SHA-256 hex, 16-char prefix), or None."""
60 from ..identity.capability import load_node_private_key
61 from ..identity.key_rotation import generate_key_id
63 priv = load_node_private_key()
64 if priv is None: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 return None
66 pub = priv.public_key()
67 return generate_key_id(pub)
70def sign_tombstone(record: TombstoneRecord) -> TombstoneRecord:
71 """Sign *record* with the node's active private key. Returns a new record with signature set."""
72 from ..identity.capability import load_node_private_key
73 from ..identity.key_rotation import generate_key_id
75 priv = load_node_private_key()
76 if priv is None:
77 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY not configured; cannot sign tombstones")
79 pub = priv.public_key()
80 key_id = generate_key_id(pub)
81 record = record.model_copy(update={"key_id": key_id})
82 body = _signing_body(record)
83 sig_bytes = priv.sign(body)
84 sig_b64 = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
85 return record.model_copy(update={"signature": sig_b64})
88def verify_tombstone_signature(record: TombstoneRecord, public_key_b64: str) -> None:
89 """Verify tombstone signature against *public_key_b64* (base64url Ed25519).
91 Raises ValueError on failure. Per §23.4.2.1 the caller must resolve the
92 signing key from the org manifest independently of the relaying peer.
93 """
94 try:
95 pub = _pubkey_from_b64(public_key_b64)
96 body = _signing_body(record)
97 sig_bytes = base64.urlsafe_b64decode(_pad(record.signature))
98 pub.verify(sig_bytes, body)
99 except InvalidSignature as exc:
100 raise ValueError("tombstone signature verification failed") from exc
101 except Exception as exc:
102 raise ValueError(f"tombstone signature error: {exc}") from exc
105def _revocation_signing_body(record: TombstoneRevocationRecord) -> bytes:
106 """JCS-canonical bytes over TombstoneRevocationRecord with 'signature' and 'reason' excluded."""
107 doc: dict[str, Any] = {
108 "id": record.id,
109 "tombstone_id": record.tombstone_id,
110 "signed_by": record.signed_by,
111 "key_id": record.key_id,
112 "created_at": record.created_at,
113 }
114 return canonicaljson.encode_canonical_json(doc)
117def sign_revocation(record: TombstoneRevocationRecord) -> TombstoneRevocationRecord:
118 """Sign *record* with the node's active private key. Returns a new record with signature set."""
119 from ..identity.capability import load_node_private_key
120 from ..identity.key_rotation import generate_key_id
122 priv = load_node_private_key()
123 if priv is None:
124 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY not configured; cannot sign revocations")
126 pub = priv.public_key()
127 key_id = generate_key_id(pub)
128 record = record.model_copy(update={"key_id": key_id})
129 body = _revocation_signing_body(record)
130 sig_bytes = priv.sign(body)
131 sig_b64 = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=")
132 return record.model_copy(update={"signature": sig_b64})
135def verify_revocation_signature(record: TombstoneRevocationRecord, public_key_b64: str) -> None:
136 """Verify revocation signature against *public_key_b64* (base64url Ed25519).
138 Raises ValueError on failure.
139 """
140 try:
141 pub = _pubkey_from_b64(public_key_b64)
142 body = _revocation_signing_body(record)
143 sig_bytes = base64.urlsafe_b64decode(_pad(record.signature))
144 pub.verify(sig_bytes, body)
145 except InvalidSignature as exc:
146 raise ValueError("revocation signature verification failed") from exc
147 except Exception as exc:
148 raise ValueError(f"revocation signature error: {exc}") from exc