Coverage for node / src / stigmem_node / models / facts.py: 97%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Fact value, request, response, and row conversion models.""" 

2 

3from __future__ import annotations 

4 

5import sqlite3 

6from typing import Any 

7 

8from pydantic import BaseModel, Field, field_validator 

9 

10from .constants import VALID_SCOPES, VALID_VALUE_TYPES 

11from .tombstones import TombstoneNotice 

12 

13VALID_WRITE_MODES = {"assert", "summarize_with_provenance"} 

14VALID_INTERPRET_AS = {"content", "instruction"} 

15 

16 

17class FactValue(BaseModel): 

18 type: str 

19 v: Any = None 

20 interpret_as: str = Field( 

21 "content", 

22 description="How the value may be interpreted by recall consumers.", 

23 ) 

24 

25 @field_validator("type") 

26 @classmethod 

27 def check_type(cls, t: str) -> str: 

28 if t not in VALID_VALUE_TYPES: 

29 raise ValueError(f"type must be one of {VALID_VALUE_TYPES}") 

30 return t 

31 

32 @field_validator("interpret_as") 

33 @classmethod 

34 def check_interpret_as(cls, mode: str) -> str: 

35 if mode not in VALID_INTERPRET_AS: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 raise ValueError(f"interpret_as must be one of {sorted(VALID_INTERPRET_AS)}") 

37 return mode 

38 

39 

40class AttestationToken(BaseModel): 

41 """Ed25519 attestation proving the caller owns the registered agent key. 

42 

43 The signature covers the canonical UTF-8 message: 

44 "{entity}\\n{relation}\\n{value.type}\\n{encoded_v}\\n{source}" 

45 where encoded_v is the same string the node derives via _encode_v(). 

46 """ 

47 

48 key_id: str = Field(..., min_length=1) 

49 signature: str = Field(..., min_length=1, description="base64url-encoded Ed25519 signature") 

50 

51 

52class AssertRequest(BaseModel): 

53 entity: str = Field(..., min_length=1) 

54 relation: str = Field(..., min_length=1) 

55 value: FactValue 

56 source: str = Field(..., min_length=1) 

57 confidence: float = Field(1.0, ge=0.0, le=1.0) 

58 scope: str = Field("local") 

59 valid_until: str | None = Field(None, description="ISO 8601 UTC expiry; null = non-expiring") 

60 attestation: AttestationToken | None = Field( 

61 None, 

62 description="Optional Ed25519 attestation; required when STIGMEM_ATTESTATION_REQUIRED=true", 

63 ) 

64 garden_id: str | None = Field( 

65 None, 

66 description="URI of the garden this fact belongs to (Spec-02-Scopes-and-ACL)", 

67 ) 

68 write_mode: str = Field( 

69 "assert", 

70 description="Write mode; summarize_with_provenance carries read-derived provenance.", 

71 ) 

72 derived_from: list[dict[str, Any]] = Field( 

73 default_factory=list, 

74 description="Source fact hashes/ids used for provenance-carrying summary writes.", 

75 ) 

76 

77 @field_validator("scope") 

78 @classmethod 

79 def check_scope(cls, s: str) -> str: 

80 if s not in VALID_SCOPES: 

81 raise ValueError(f"scope must be one of {VALID_SCOPES}") 

82 return s 

83 

84 @field_validator("write_mode") 

85 @classmethod 

86 def check_write_mode(cls, mode: str) -> str: 

87 if mode not in VALID_WRITE_MODES: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 raise ValueError(f"write_mode must be one of {sorted(VALID_WRITE_MODES)}") 

89 return mode 

90 

91 

92class FactRecord(BaseModel): 

93 id: str 

94 entity: str 

95 relation: str 

96 value: FactValue 

97 source: str 

98 timestamp: str 

99 hlc: str | None = None # v0.5: HLC timestamp; null for pre-migration facts 

100 received_from: str | None = None # v0.5: originating node_id if federated 

101 valid_until: str | None = None 

102 confidence: float 

103 scope: str 

104 attested_key_id: str | None = None # v1.0: agent key that attested this assertion 

105 origin_node_id: str | None = None # v0.8: original assertor node_id (§6.8.1) 

106 origin_allowed_scopes: list[str] | None = None # v0.8: scopes permitted at origin (§6.8.1) 

107 garden_id: str | None = None # v0.9: garden URI; null = no garden 

108 attested: bool | None = None # v0.9: source attestation result; null = not applicable 

109 contradicted: bool = False 

110 # write-time convention warnings (assert only) 

111 warnings: list[str] = Field(default_factory=list) 

112 # v1.1: source-trust snapshot (§19.4); recomputed live at recall — stored is audit only 

113 source_trust: float | None = None 

114 # v1.1: quarantine metadata (§19.5) 

115 quarantine_status: str | None = None 

116 quarantine_garden_id: str | None = None 

117 # v1.1: recall-time computed fields — not stored; populated by recall pipeline 

118 effective_confidence: float | None = None 

119 sanitizer_warnings: list[str] = Field(default_factory=list) 

120 sanitizer_redacted: bool = False 

121 # Phase 13: content-addressing (spec §25) 

122 cid: str | None = None 

123 # R-21: provenance for summary writes derived from recalled/read facts 

124 derived_from: list[dict[str, Any]] = Field(default_factory=list) 

125 

126 

127class QueryResponse(BaseModel): 

128 facts: list[FactRecord] 

129 total: int | None = None 

130 cursor: str | None 

131 tombstone_notices: list[TombstoneNotice] = Field(default_factory=list) 

132 

133def _optional_col(row: sqlite3.Row, col: str, keys: Any) -> Any: 

134 """Return row[col] if column is in the result set, else None.""" 

135 return row[col] if col in keys else None 

136 

137 

138def row_to_record( 

139 row: sqlite3.Row, 

140 contradicted: bool = False, 

141 warnings: list[str] | None = None, 

142 effective_confidence: float | None = None, 

143 sanitizer_warnings: list[str] | None = None, 

144 sanitizer_redacted: bool = False, 

145) -> FactRecord: 

146 import json as _json 

147 

148 keys = row.keys() 

149 attested_raw = _optional_col(row, "attested", keys) 

150 attested: bool | None = None if attested_raw is None else bool(attested_raw) 

151 raw_scopes = _optional_col(row, "origin_allowed_scopes", keys) 

152 origin_allowed_scopes: list[str] | None = _json.loads(raw_scopes) if raw_scopes else None 

153 source_trust_raw = _optional_col(row, "source_trust", keys) 

154 source_trust: float | None = float(source_trust_raw) if source_trust_raw is not None else None 

155 valid_until = _optional_col(row, "projected_valid_until", keys) 

156 if valid_until is None: 

157 valid_until = row["valid_until"] 

158 confidence_raw = _optional_col(row, "projected_confidence", keys) 

159 confidence = row["confidence"] if confidence_raw is None else confidence_raw 

160 garden_id = _optional_col(row, "projected_garden_id", keys) 

161 if garden_id is None: 

162 garden_id = _optional_col(row, "garden_id", keys) 

163 quarantine_status = _optional_col(row, "projected_quarantine_status", keys) 

164 if quarantine_status is None: 

165 quarantine_status = _optional_col(row, "quarantine_status", keys) 

166 quarantine_garden_id = _optional_col(row, "projected_quarantine_garden_id", keys) 

167 if quarantine_garden_id is None: 

168 quarantine_garden_id = _optional_col(row, "quarantine_garden_id", keys) 

169 cid = _optional_col(row, "projected_cid", keys) 

170 if cid is None: 

171 cid = _optional_col(row, "cid", keys) 

172 return FactRecord( 

173 id=row["id"], 

174 entity=row["entity"], 

175 relation=row["relation"], 

176 value=FactValue( 

177 type=row["value_type"], 

178 v=_parse_v(row["value_type"], row["value_v"]), 

179 interpret_as=_optional_col(row, "interpret_as", keys) or "content", 

180 ), 

181 source=row["source"], 

182 timestamp=row["timestamp"], 

183 hlc=_optional_col(row, "hlc", keys), 

184 received_from=_optional_col(row, "received_from", keys), 

185 valid_until=valid_until, 

186 confidence=confidence, 

187 scope=row["scope"], 

188 attested_key_id=_optional_col(row, "attested_key_id", keys), 

189 origin_node_id=_optional_col(row, "origin_node_id", keys), 

190 origin_allowed_scopes=origin_allowed_scopes, 

191 garden_id=garden_id, 

192 attested=attested, 

193 contradicted=contradicted, 

194 warnings=warnings or [], 

195 source_trust=source_trust, 

196 quarantine_status=quarantine_status, 

197 quarantine_garden_id=quarantine_garden_id, 

198 effective_confidence=effective_confidence, 

199 sanitizer_warnings=sanitizer_warnings or [], 

200 sanitizer_redacted=sanitizer_redacted, 

201 cid=cid, 

202 derived_from=_json.loads(_optional_col(row, "derived_from", keys) or "[]"), 

203 ) 

204 

205 

206def _parse_v(vtype: str, raw: str) -> Any: 

207 if vtype == "number": 

208 return float(raw) 

209 if vtype == "boolean": 

210 return raw == "true" 

211 if vtype == "null": 

212 return None 

213 return raw # string, text, datetime, ref