Coverage for node / src / stigmem_node / cid.py: 96%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Content-addressed fact IDs — spec §25. 

2 

3CID = "sha256:" + hex_lowercase(SHA-256(RFC8785(canonical_fact_body))) 

4 

5The canonical body is a JSON object with exactly 7 fields in lexicographic key order: 

6 confidence, entity, relation, scope, source, value_type, value_v 

7 

8Security-relevant excluded fields (§25.2.1 rev 14): 

9 valid_until, derived_from, attestation_chain, source_trust, signature, reason 

10 (these require independent validation; CID coverage alone is not sufficient) 

11 

12fact_id and cid are also excluded (circular). 

13timestamp/created_at is excluded so the same assertion at different times shares one CID. 

14""" 

15 

16from __future__ import annotations 

17 

18import hashlib 

19import json 

20import re 

21from typing import Any 

22 

23_CID_PREFIX = "sha256:" 

24_CID_HEX_RE = re.compile(r"^sha256:[0-9a-f]{64}$") 

25 

26 

27def compute_cid( 

28 entity: str, 

29 relation: str, 

30 value_type: str, 

31 value_v: str, 

32 source: str, 

33 scope: str, 

34 confidence: float = 1.0, 

35) -> str: 

36 """Return the CID for a fact's canonical body (spec §25.2.1, §25.2.2).""" 

37 body: dict[str, Any] = { 

38 "confidence": confidence, 

39 "entity": entity, 

40 "relation": relation, 

41 "scope": scope, 

42 "source": source, 

43 "value_type": value_type, 

44 "value_v": value_v, 

45 } 

46 canonical = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode( 

47 "utf-8" 

48 ) 

49 digest = hashlib.sha256(canonical).hexdigest() 

50 return f"{_CID_PREFIX}{digest}" 

51 

52 

53def compute_cid_from_row(row: Any) -> str: 

54 """Convenience wrapper: compute CID from a facts-table row.""" 

55 return compute_cid( 

56 entity=row["entity"], 

57 relation=row["relation"], 

58 value_type=row["value_type"], 

59 value_v=row["value_v"] or "", 

60 source=row["source"], 

61 scope=row["scope"], 

62 confidence=float(row["confidence"]), 

63 ) 

64 

65 

66class CidMismatchError(ValueError): 

67 """Raised when a stored fact CID does not match its canonical body.""" 

68 

69 def __init__(self, *, fact_id: str, stored_cid: str, computed_cid: str) -> None: 

70 super().__init__(f"CID mismatch for fact {fact_id}") 

71 self.fact_id = fact_id 

72 self.stored_cid = stored_cid 

73 self.computed_cid = computed_cid 

74 

75 

76def _optional_row_value(row: Any, key: str) -> Any: 

77 try: 

78 keys = row.keys() 

79 except AttributeError: 

80 return row.get(key) if isinstance(row, dict) else None 

81 return row[key] if key in keys else None 

82 

83 

84def stored_cid_from_row(row: Any) -> str | None: 

85 """Return the stored/projected CID for a fact row, if one is present.""" 

86 projected = _optional_row_value(row, "projected_cid") 

87 if projected is not None: 

88 return str(projected) 

89 stored = _optional_row_value(row, "cid") 

90 return None if stored is None else str(stored) 

91 

92 

93def verify_cid_from_row(row: Any) -> None: 

94 """Verify a fact row's stored CID, preserving legacy NULL-CID rows.""" 

95 stored = stored_cid_from_row(row) 

96 if stored is None: 

97 return 

98 computed = compute_cid_from_row(row) 

99 if computed != stored: 

100 raise CidMismatchError( 

101 fact_id=str(row["id"]), 

102 stored_cid=stored, 

103 computed_cid=computed, 

104 ) 

105 

106 

107def is_valid_cid(s: str) -> bool: 

108 """Return True if *s* looks like a well-formed sha256 CID (spec §25.2).""" 

109 return bool(_CID_HEX_RE.match(s)) 

110 

111 

112def is_cid(s: str) -> bool: 

113 """Return True if *s* starts with the sha256: prefix (quick pre-filter).""" 

114 return s.startswith(_CID_PREFIX)