Coverage for sdks / stigmem-py / src / stigmem / verification.py: 44%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Client-side integrity verification helpers for Stigmem responses.""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6import json 

7from typing import Any 

8 

9from .models import Fact, FactChainCheckpointProof, FactChainProof 

10 

11_HASH_PREFIX = "sha256:" 

12 

13 

14class StigmemVerificationError(ValueError): 

15 """Raised when local verification rejects Stigmem proof material.""" 

16 

17 

18def _encoded_value_v(fact: Fact) -> str: 

19 if fact.value.type == "null": 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 return "null" 

21 if fact.value.type == "boolean": 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 return "true" if fact.value.v else "false" 

23 return "" if fact.value.v is None else str(fact.value.v) 

24 

25 

26def compute_fact_cid(fact: Fact) -> str: 

27 """Compute the canonical CID for a fact body.""" 

28 body: dict[str, Any] = { 

29 "confidence": fact.confidence, 

30 "entity": fact.entity, 

31 "relation": fact.relation, 

32 "scope": fact.scope, 

33 "source": fact.source, 

34 "value_type": fact.value.type, 

35 "value_v": _encoded_value_v(fact), 

36 } 

37 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False) 

38 return f"{_HASH_PREFIX}{hashlib.sha256(canonical.encode('utf-8')).hexdigest()}" 

39 

40 

41def verify_fact_cid(fact: Fact) -> str: 

42 """Verify a fact's stored CID and return the computed CID. 

43 

44 Legacy responses may omit ``cid``; those cannot be independently verified and are 

45 accepted by this helper for backward compatibility. 

46 """ 

47 computed = compute_fact_cid(fact) 

48 if fact.cid is not None and fact.cid != computed: 

49 raise StigmemVerificationError( 

50 f"CID mismatch for fact {fact.id}: stored={fact.cid!r}, computed={computed!r}" 

51 ) 

52 return computed 

53 

54 

55def _sha256_json(body: dict[str, Any]) -> str: 

56 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False) 

57 return hashlib.sha256(canonical.encode("utf-8")).hexdigest() 

58 

59 

60def _checkpoint_payload(checkpoint: FactChainCheckpointProof) -> dict[str, Any]: 

61 return { 

62 "chain_hash": checkpoint.chain_hash, 

63 "covered_chain_seq": checkpoint.covered_chain_seq, 

64 "created_at": checkpoint.created_at, 

65 "kind": "stigmem.fact_chain_checkpoint", 

66 "tenant_id": checkpoint.tenant_id, 

67 "version": 1, 

68 } 

69 

70 

71def verify_checkpoint_proof(checkpoint: FactChainCheckpointProof) -> None: 

72 """Verify deterministic checkpoint proof material available to the SDK. 

73 

74 Local transparency-log checkpoints include the submitted payload in ``tl_raw``. 

75 For Rekor-backed checkpoints, this validates mandatory inclusion metadata shape; 

76 cryptographic Rekor STH verification remains a node/operator concern because it 

77 requires Rekor log material and trust roots. 

78 """ 

79 if checkpoint.status != "submitted": 

80 raise StigmemVerificationError( 

81 f"checkpoint {checkpoint.id} is not submitted: {checkpoint.status}" 

82 ) 

83 if not checkpoint.tl_leaf_hash: 

84 raise StigmemVerificationError(f"checkpoint {checkpoint.id} is missing tl_leaf_hash") 

85 if checkpoint.tl_log_index is None: 

86 raise StigmemVerificationError(f"checkpoint {checkpoint.id} is missing tl_log_index") 

87 

88 raw_payload = checkpoint.tl_raw.get("payload") 

89 if isinstance(raw_payload, dict): 

90 expected_leaf_hash = _sha256_json(raw_payload) 

91 if checkpoint.tl_leaf_hash != expected_leaf_hash: 

92 raise StigmemVerificationError( 

93 f"checkpoint {checkpoint.id} leaf hash mismatch: " 

94 f"stored={checkpoint.tl_leaf_hash!r}, computed={expected_leaf_hash!r}" 

95 ) 

96 expected_payload = _checkpoint_payload(checkpoint) 

97 if raw_payload != expected_payload: 

98 raise StigmemVerificationError( 

99 f"checkpoint {checkpoint.id} payload does not match chain head metadata" 

100 ) 

101 

102 

103def verify_fact_chain_proof( 

104 proof: FactChainProof | None, 

105 *, 

106 require_checkpoint: bool = False, 

107) -> None: 

108 """Verify compact fact-chain proof metadata returned by ``Stigmem-Verify: full``.""" 

109 if proof is None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 raise StigmemVerificationError("missing fact-chain proof") 

111 if proof.checked_entries < 0: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 raise StigmemVerificationError("fact-chain proof has negative checked_entries") 

113 if proof.checked_entries > 0 and not proof.head_hash: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 raise StigmemVerificationError("fact-chain proof is missing head_hash") 

115 

116 checkpoint = proof.checkpoint 

117 if checkpoint is None: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 if require_checkpoint: 

119 raise StigmemVerificationError("fact-chain proof is missing checkpoint metadata") 

120 return 

121 

122 if checkpoint.tenant_id != proof.tenant_id: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 raise StigmemVerificationError("checkpoint tenant does not match fact-chain proof") 

124 if proof.head_hash is not None and checkpoint.chain_hash != proof.head_hash: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true

125 raise StigmemVerificationError("checkpoint chain_hash does not match proof head_hash") 

126 if checkpoint.covered_chain_seq > proof.checked_entries: 

127 raise StigmemVerificationError("checkpoint covers more entries than the verified chain") 

128 verify_checkpoint_proof(checkpoint)