Coverage for sdks / stigmem-py / src / stigmem / verification.py: 44%
61 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Client-side integrity verification helpers for Stigmem responses."""
3from __future__ import annotations
5import hashlib
6import json
7from typing import Any
9from .models import Fact, FactChainCheckpointProof, FactChainProof
11_HASH_PREFIX = "sha256:"
14class StigmemVerificationError(ValueError):
15 """Raised when local verification rejects Stigmem proof material."""
18def _encoded_value_v(fact: Fact) -> str:
19 if fact.value.type == "null": 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 return "null"
21 if fact.value.type == "boolean": 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 return "true" if fact.value.v else "false"
23 return "" if fact.value.v is None else str(fact.value.v)
26def compute_fact_cid(fact: Fact) -> str:
27 """Compute the canonical CID for a fact body."""
28 body: dict[str, Any] = {
29 "confidence": fact.confidence,
30 "entity": fact.entity,
31 "relation": fact.relation,
32 "scope": fact.scope,
33 "source": fact.source,
34 "value_type": fact.value.type,
35 "value_v": _encoded_value_v(fact),
36 }
37 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
38 return f"{_HASH_PREFIX}{hashlib.sha256(canonical.encode('utf-8')).hexdigest()}"
41def verify_fact_cid(fact: Fact) -> str:
42 """Verify a fact's stored CID and return the computed CID.
44 Legacy responses may omit ``cid``; those cannot be independently verified and are
45 accepted by this helper for backward compatibility.
46 """
47 computed = compute_fact_cid(fact)
48 if fact.cid is not None and fact.cid != computed:
49 raise StigmemVerificationError(
50 f"CID mismatch for fact {fact.id}: stored={fact.cid!r}, computed={computed!r}"
51 )
52 return computed
55def _sha256_json(body: dict[str, Any]) -> str:
56 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
57 return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
60def _checkpoint_payload(checkpoint: FactChainCheckpointProof) -> dict[str, Any]:
61 return {
62 "chain_hash": checkpoint.chain_hash,
63 "covered_chain_seq": checkpoint.covered_chain_seq,
64 "created_at": checkpoint.created_at,
65 "kind": "stigmem.fact_chain_checkpoint",
66 "tenant_id": checkpoint.tenant_id,
67 "version": 1,
68 }
71def verify_checkpoint_proof(checkpoint: FactChainCheckpointProof) -> None:
72 """Verify deterministic checkpoint proof material available to the SDK.
74 Local transparency-log checkpoints include the submitted payload in ``tl_raw``.
75 For Rekor-backed checkpoints, this validates mandatory inclusion metadata shape;
76 cryptographic Rekor STH verification remains a node/operator concern because it
77 requires Rekor log material and trust roots.
78 """
79 if checkpoint.status != "submitted":
80 raise StigmemVerificationError(
81 f"checkpoint {checkpoint.id} is not submitted: {checkpoint.status}"
82 )
83 if not checkpoint.tl_leaf_hash:
84 raise StigmemVerificationError(f"checkpoint {checkpoint.id} is missing tl_leaf_hash")
85 if checkpoint.tl_log_index is None:
86 raise StigmemVerificationError(f"checkpoint {checkpoint.id} is missing tl_log_index")
88 raw_payload = checkpoint.tl_raw.get("payload")
89 if isinstance(raw_payload, dict):
90 expected_leaf_hash = _sha256_json(raw_payload)
91 if checkpoint.tl_leaf_hash != expected_leaf_hash:
92 raise StigmemVerificationError(
93 f"checkpoint {checkpoint.id} leaf hash mismatch: "
94 f"stored={checkpoint.tl_leaf_hash!r}, computed={expected_leaf_hash!r}"
95 )
96 expected_payload = _checkpoint_payload(checkpoint)
97 if raw_payload != expected_payload:
98 raise StigmemVerificationError(
99 f"checkpoint {checkpoint.id} payload does not match chain head metadata"
100 )
103def verify_fact_chain_proof(
104 proof: FactChainProof | None,
105 *,
106 require_checkpoint: bool = False,
107) -> None:
108 """Verify compact fact-chain proof metadata returned by ``Stigmem-Verify: full``."""
109 if proof is None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 raise StigmemVerificationError("missing fact-chain proof")
111 if proof.checked_entries < 0: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 raise StigmemVerificationError("fact-chain proof has negative checked_entries")
113 if proof.checked_entries > 0 and not proof.head_hash: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 raise StigmemVerificationError("fact-chain proof is missing head_hash")
116 checkpoint = proof.checkpoint
117 if checkpoint is None: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 if require_checkpoint:
119 raise StigmemVerificationError("fact-chain proof is missing checkpoint metadata")
120 return
122 if checkpoint.tenant_id != proof.tenant_id: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 raise StigmemVerificationError("checkpoint tenant does not match fact-chain proof")
124 if proof.head_hash is not None and checkpoint.chain_hash != proof.head_hash: 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was always true
125 raise StigmemVerificationError("checkpoint chain_hash does not match proof head_hash")
126 if checkpoint.covered_chain_seq > proof.checked_entries:
127 raise StigmemVerificationError("checkpoint covers more entries than the verified chain")
128 verify_checkpoint_proof(checkpoint)