Coverage for node / src / stigmem_node / routes / federation / audit_conflicts.py: 84%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation audit and conflict routes.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import UTC, datetime 

8from typing import Annotated, Any 

9 

10from fastapi import Depends, HTTPException, Query, status 

11 

12from ...auth import Identity, resolve_identity 

13from ...db import db 

14from ...hlc import node_hlc 

15from ...models.facts import row_to_record 

16from ...models.federation import ConflictResolveRequest 

17from .common import router 

18 

19 

20@router.get("/v1/federation/audit") 

21def get_audit_log( 

22 identity: Annotated[Identity, Depends(resolve_identity)], 

23 peer_id: str | None = Query(None), 

24 event_type: str | None = Query(None), 

25 limit: int = Query(50, ge=1, le=500), 

26 cursor: str | None = Query(None), 

27) -> dict[str, Any]: 

28 if not identity.can_federate(): 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 raise HTTPException(status_code=403, detail="federate permission required") 

30 

31 conditions: list[str] = [] 

32 params: list[Any] = [] 

33 if peer_id: 

34 conditions.append("peer_id = ?") 

35 params.append(peer_id) 

36 if event_type: 

37 conditions.append("event_type = ?") 

38 params.append(event_type) 

39 if cursor: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 conditions.append("id > ?") 

41 params.append(cursor) 

42 

43 where = f"WHERE {' AND '.join(conditions)}" if conditions else "" 

44 params.append(limit + 1) 

45 

46 with db() as conn: 

47 rows = conn.execute( 

48 f"SELECT * FROM federation_audit {where} ORDER BY ts DESC, id DESC LIMIT ?", # noqa: S608 # nosec B608 — where built from literal fragments; values in params 

49 params, 

50 ).fetchall() 

51 

52 has_more = len(rows) > limit 

53 rows = rows[:limit] 

54 next_cursor = rows[-1]["id"] if has_more and rows else None 

55 

56 return { 

57 "entries": [ 

58 { 

59 "id": r["id"], 

60 "peer_id": r["peer_id"], 

61 "event_type": r["event_type"], 

62 "detail": json.loads(r["detail"]) if r["detail"] else None, 

63 "ts": r["ts"], 

64 } 

65 for r in rows 

66 ], 

67 "cursor": next_cursor, 

68 "has_more": has_more, 

69 } 

70 

71 

72# --------------------------------------------------------------------------- 

73# GET /v1/conflicts — list conflicts (§5.9) 

74# --------------------------------------------------------------------------- 

75 

76 

77@router.get("/v1/conflicts") 

78def list_conflicts( 

79 identity: Annotated[Identity, Depends(resolve_identity)], 

80 conflict_status: str | None = Query(None, alias="status"), 

81 cursor: str | None = Query(None), 

82 limit: int = Query(50, ge=1, le=500), 

83) -> dict[str, Any]: 

84 if not identity.can_read(): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 raise HTTPException(status_code=403, detail="read permission required") 

86 

87 conditions: list[str] = [] 

88 params: list[Any] = [] 

89 if conflict_status: 89 ↛ 92line 89 didn't jump to line 92 because the condition on line 89 was always true

90 conditions.append("c.status = ?") 

91 params.append(conflict_status) 

92 if cursor: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 conditions.append("c.id > ?") 

94 params.append(cursor) 

95 

96 where = f"WHERE {' AND '.join(conditions)}" if conditions else "" 

97 params.append(limit + 1) 

98 

99 with db() as conn: 

100 sql = ( 

101 "SELECT c.id, c.fact_a_id, c.fact_b_id, c.status, c.resolution_fact_id, " # noqa: S608 # nosec B608 

102 f"c.detected_at FROM conflicts c {where} ORDER BY c.detected_at DESC, " 

103 "c.id DESC LIMIT ?" 

104 ) 

105 rows = conn.execute( 

106 sql, 

107 params, 

108 ).fetchall() 

109 

110 conflicts: list[dict[str, Any]] = [] 

111 for r in rows[:limit]: 

112 fa = conn.execute("SELECT * FROM facts WHERE id = ?", (r["fact_a_id"],)).fetchone() 

113 fb = conn.execute("SELECT * FROM facts WHERE id = ?", (r["fact_b_id"],)).fetchone() 

114 conflicts.append( 

115 { 

116 "conflict_id": r["id"], 

117 "fact_a": row_to_record(fa).model_dump() if fa else None, 

118 "fact_b": row_to_record(fb).model_dump() if fb else None, 

119 "status": r["status"], 

120 "resolved_by": r["resolution_fact_id"], 

121 "detected_at": r["detected_at"], 

122 } 

123 ) 

124 

125 has_more = len(rows) > limit 

126 next_cursor = rows[limit - 1]["id"] if has_more and len(rows) >= limit else None 

127 return {"conflicts": conflicts, "cursor": next_cursor, "has_more": has_more} 

128 

129 

130# --------------------------------------------------------------------------- 

131# POST /v1/conflicts/:conflict_id/resolve — resolve a conflict (§5.10) 

132# --------------------------------------------------------------------------- 

133 

134 

135def _encode_value(vtype: str, v: Any) -> str: 

136 if vtype == "null": 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 return "null" 

138 if vtype == "boolean": 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 return "true" if v else "false" 

140 return str(v) 

141 

142 

143@router.post("/v1/conflicts/{conflict_id}/resolve") 

144def resolve_conflict( 

145 conflict_id: str, 

146 req: ConflictResolveRequest, 

147 identity: Annotated[Identity, Depends(resolve_identity)], 

148) -> dict[str, Any]: 

149 """Assert a canonical resolution fact and close the conflict (Spec-15-Fact-Semantics).""" 

150 if not identity.can_write(): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 raise HTTPException( 

152 status_code=status.HTTP_403_FORBIDDEN, detail="write permission required" 

153 ) 

154 

155 with db() as conn: 

156 conflict = conn.execute("SELECT * FROM conflicts WHERE id = ?", (conflict_id,)).fetchone() 

157 

158 if conflict is None: 

159 raise HTTPException(status_code=404, detail="conflict not found") 

160 if conflict["status"] == "resolved": 

161 raise HTTPException(status_code=409, detail="conflict already resolved") 

162 

163 fact_a = conn.execute( 

164 "SELECT * FROM facts WHERE id = ?", (conflict["fact_a_id"],) 

165 ).fetchone() 

166 fact_b = conn.execute( 

167 "SELECT * FROM facts WHERE id = ?", (conflict["fact_b_id"],) 

168 ).fetchone() 

169 

170 if fact_a is None or fact_b is None: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 raise HTTPException(status_code=500, detail="conflicting facts not found in store") 

172 

173 # Determine value for the resolution fact 

174 if req.new_value is not None: 

175 res_type = req.new_value.type 

176 res_v = _encode_value(req.new_value.type, req.new_value.v) 

177 elif req.winning_fact_id is not None: 

178 if req.winning_fact_id == fact_a["id"]: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 winner = fact_a 

180 elif req.winning_fact_id == fact_b["id"]: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true

181 winner = fact_b 

182 else: 

183 raise HTTPException( 

184 status_code=422, 

185 detail="winning_fact_id must be one of the conflicting facts", 

186 ) 

187 res_type = winner["value_type"] 

188 res_v = winner["value_v"] 

189 else: 

190 raise HTTPException(status_code=422, detail="provide winning_fact_id or new_value") 

191 

192 resolution_fact_id = str(uuid.uuid4()) 

193 now = datetime.now(UTC).isoformat() 

194 caller = identity.entity_uri 

195 

196 # 1. Assert resolution fact under a namespaced entity so it never shares the 

197 # (entity, relation, scope) triple with the conflicting facts. Writing under 

198 # the original entity+relation would trigger a new contradiction wave when the 

199 # fact is federated to peers (spec §resolution-semantics, EG-51). 

200 resolution_entity = f"stigmem:resolution:{conflict_id}" 

201 hlc_res = node_hlc.tick() 

202 conn.execute( 

203 """INSERT INTO facts 

204 (id, entity, relation, value_type, value_v, source, timestamp, 

205 valid_until, confidence, scope, hlc, received_from) 

206 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

207 ( 

208 resolution_fact_id, 

209 resolution_entity, 

210 fact_a["relation"], 

211 res_type, 

212 res_v, 

213 caller, 

214 now, 

215 None, 

216 1.0, 

217 fact_a["scope"], 

218 hlc_res, 

219 None, 

220 ), 

221 ) 

222 

223 # 2. Assert stigmem:resolves meta-fact (spec §5.10) 

224 hlc_meta = node_hlc.tick() 

225 conn.execute( 

226 """INSERT INTO facts 

227 (id, entity, relation, value_type, value_v, source, timestamp, 

228 valid_until, confidence, scope, hlc, received_from) 

229 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

230 ( 

231 str(uuid.uuid4()), 

232 resolution_fact_id, 

233 "stigmem:resolves", 

234 "ref", 

235 conflict_id, 

236 "system:stigmem", 

237 now, 

238 None, 

239 1.0, 

240 fact_a["scope"], 

241 hlc_meta, 

242 None, 

243 ), 

244 ) 

245 

246 # 3. Record updated conflict:status as a new fact (status changes are immutable appends) 

247 hlc_status = node_hlc.tick() 

248 conn.execute( 

249 """INSERT INTO facts 

250 (id, entity, relation, value_type, value_v, source, timestamp, 

251 valid_until, confidence, scope, hlc, received_from) 

252 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

253 ( 

254 str(uuid.uuid4()), 

255 conflict_id, 

256 "stigmem:conflict:status", 

257 "string", 

258 "resolved", 

259 "system:stigmem", 

260 now, 

261 None, 

262 1.0, 

263 fact_a["scope"], 

264 hlc_status, 

265 None, 

266 ), 

267 ) 

268 

269 # 4. Update conflicts table 

270 conn.execute( 

271 "UPDATE conflicts SET status = 'resolved', resolution_fact_id = ? WHERE id = ?", 

272 (resolution_fact_id, conflict_id), 

273 ) 

274 

275 return {"resolution_fact_id": resolution_fact_id, "conflict_status": "resolved"}