Coverage for node / src / stigmem_node / routes / federation / audit_conflicts.py: 84%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation audit and conflict routes."""
3from __future__ import annotations
5import json
6import uuid
7from datetime import UTC, datetime
8from typing import Annotated, Any
10from fastapi import Depends, HTTPException, Query, status
12from ...auth import Identity, resolve_identity
13from ...db import db
14from ...hlc import node_hlc
15from ...models.facts import row_to_record
16from ...models.federation import ConflictResolveRequest
17from .common import router
20@router.get("/v1/federation/audit")
21def get_audit_log(
22 identity: Annotated[Identity, Depends(resolve_identity)],
23 peer_id: str | None = Query(None),
24 event_type: str | None = Query(None),
25 limit: int = Query(50, ge=1, le=500),
26 cursor: str | None = Query(None),
27) -> dict[str, Any]:
28 if not identity.can_federate(): 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 raise HTTPException(status_code=403, detail="federate permission required")
31 conditions: list[str] = []
32 params: list[Any] = []
33 if peer_id:
34 conditions.append("peer_id = ?")
35 params.append(peer_id)
36 if event_type:
37 conditions.append("event_type = ?")
38 params.append(event_type)
39 if cursor: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 conditions.append("id > ?")
41 params.append(cursor)
43 where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
44 params.append(limit + 1)
46 with db() as conn:
47 rows = conn.execute(
48 f"SELECT * FROM federation_audit {where} ORDER BY ts DESC, id DESC LIMIT ?", # noqa: S608 # nosec B608 — where built from literal fragments; values in params
49 params,
50 ).fetchall()
52 has_more = len(rows) > limit
53 rows = rows[:limit]
54 next_cursor = rows[-1]["id"] if has_more and rows else None
56 return {
57 "entries": [
58 {
59 "id": r["id"],
60 "peer_id": r["peer_id"],
61 "event_type": r["event_type"],
62 "detail": json.loads(r["detail"]) if r["detail"] else None,
63 "ts": r["ts"],
64 }
65 for r in rows
66 ],
67 "cursor": next_cursor,
68 "has_more": has_more,
69 }
72# ---------------------------------------------------------------------------
73# GET /v1/conflicts — list conflicts (§5.9)
74# ---------------------------------------------------------------------------
77@router.get("/v1/conflicts")
78def list_conflicts(
79 identity: Annotated[Identity, Depends(resolve_identity)],
80 conflict_status: str | None = Query(None, alias="status"),
81 cursor: str | None = Query(None),
82 limit: int = Query(50, ge=1, le=500),
83) -> dict[str, Any]:
84 if not identity.can_read(): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 raise HTTPException(status_code=403, detail="read permission required")
87 conditions: list[str] = []
88 params: list[Any] = []
89 if conflict_status: 89 ↛ 92line 89 didn't jump to line 92 because the condition on line 89 was always true
90 conditions.append("c.status = ?")
91 params.append(conflict_status)
92 if cursor: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 conditions.append("c.id > ?")
94 params.append(cursor)
96 where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
97 params.append(limit + 1)
99 with db() as conn:
100 sql = (
101 "SELECT c.id, c.fact_a_id, c.fact_b_id, c.status, c.resolution_fact_id, " # noqa: S608 # nosec B608
102 f"c.detected_at FROM conflicts c {where} ORDER BY c.detected_at DESC, "
103 "c.id DESC LIMIT ?"
104 )
105 rows = conn.execute(
106 sql,
107 params,
108 ).fetchall()
110 conflicts: list[dict[str, Any]] = []
111 for r in rows[:limit]:
112 fa = conn.execute("SELECT * FROM facts WHERE id = ?", (r["fact_a_id"],)).fetchone()
113 fb = conn.execute("SELECT * FROM facts WHERE id = ?", (r["fact_b_id"],)).fetchone()
114 conflicts.append(
115 {
116 "conflict_id": r["id"],
117 "fact_a": row_to_record(fa).model_dump() if fa else None,
118 "fact_b": row_to_record(fb).model_dump() if fb else None,
119 "status": r["status"],
120 "resolved_by": r["resolution_fact_id"],
121 "detected_at": r["detected_at"],
122 }
123 )
125 has_more = len(rows) > limit
126 next_cursor = rows[limit - 1]["id"] if has_more and len(rows) >= limit else None
127 return {"conflicts": conflicts, "cursor": next_cursor, "has_more": has_more}
130# ---------------------------------------------------------------------------
131# POST /v1/conflicts/:conflict_id/resolve — resolve a conflict (§5.10)
132# ---------------------------------------------------------------------------
135def _encode_value(vtype: str, v: Any) -> str:
136 if vtype == "null": 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 return "null"
138 if vtype == "boolean": 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 return "true" if v else "false"
140 return str(v)
143@router.post("/v1/conflicts/{conflict_id}/resolve")
144def resolve_conflict(
145 conflict_id: str,
146 req: ConflictResolveRequest,
147 identity: Annotated[Identity, Depends(resolve_identity)],
148) -> dict[str, Any]:
149 """Assert a canonical resolution fact and close the conflict (Spec-15-Fact-Semantics)."""
150 if not identity.can_write(): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 raise HTTPException(
152 status_code=status.HTTP_403_FORBIDDEN, detail="write permission required"
153 )
155 with db() as conn:
156 conflict = conn.execute("SELECT * FROM conflicts WHERE id = ?", (conflict_id,)).fetchone()
158 if conflict is None:
159 raise HTTPException(status_code=404, detail="conflict not found")
160 if conflict["status"] == "resolved":
161 raise HTTPException(status_code=409, detail="conflict already resolved")
163 fact_a = conn.execute(
164 "SELECT * FROM facts WHERE id = ?", (conflict["fact_a_id"],)
165 ).fetchone()
166 fact_b = conn.execute(
167 "SELECT * FROM facts WHERE id = ?", (conflict["fact_b_id"],)
168 ).fetchone()
170 if fact_a is None or fact_b is None: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 raise HTTPException(status_code=500, detail="conflicting facts not found in store")
173 # Determine value for the resolution fact
174 if req.new_value is not None:
175 res_type = req.new_value.type
176 res_v = _encode_value(req.new_value.type, req.new_value.v)
177 elif req.winning_fact_id is not None:
178 if req.winning_fact_id == fact_a["id"]: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 winner = fact_a
180 elif req.winning_fact_id == fact_b["id"]: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true
181 winner = fact_b
182 else:
183 raise HTTPException(
184 status_code=422,
185 detail="winning_fact_id must be one of the conflicting facts",
186 )
187 res_type = winner["value_type"]
188 res_v = winner["value_v"]
189 else:
190 raise HTTPException(status_code=422, detail="provide winning_fact_id or new_value")
192 resolution_fact_id = str(uuid.uuid4())
193 now = datetime.now(UTC).isoformat()
194 caller = identity.entity_uri
196 # 1. Assert resolution fact under a namespaced entity so it never shares the
197 # (entity, relation, scope) triple with the conflicting facts. Writing under
198 # the original entity+relation would trigger a new contradiction wave when the
199 # fact is federated to peers (spec §resolution-semantics, EG-51).
200 resolution_entity = f"stigmem:resolution:{conflict_id}"
201 hlc_res = node_hlc.tick()
202 conn.execute(
203 """INSERT INTO facts
204 (id, entity, relation, value_type, value_v, source, timestamp,
205 valid_until, confidence, scope, hlc, received_from)
206 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
207 (
208 resolution_fact_id,
209 resolution_entity,
210 fact_a["relation"],
211 res_type,
212 res_v,
213 caller,
214 now,
215 None,
216 1.0,
217 fact_a["scope"],
218 hlc_res,
219 None,
220 ),
221 )
223 # 2. Assert stigmem:resolves meta-fact (spec §5.10)
224 hlc_meta = node_hlc.tick()
225 conn.execute(
226 """INSERT INTO facts
227 (id, entity, relation, value_type, value_v, source, timestamp,
228 valid_until, confidence, scope, hlc, received_from)
229 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
230 (
231 str(uuid.uuid4()),
232 resolution_fact_id,
233 "stigmem:resolves",
234 "ref",
235 conflict_id,
236 "system:stigmem",
237 now,
238 None,
239 1.0,
240 fact_a["scope"],
241 hlc_meta,
242 None,
243 ),
244 )
246 # 3. Record updated conflict:status as a new fact (status changes are immutable appends)
247 hlc_status = node_hlc.tick()
248 conn.execute(
249 """INSERT INTO facts
250 (id, entity, relation, value_type, value_v, source, timestamp,
251 valid_until, confidence, scope, hlc, received_from)
252 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
253 (
254 str(uuid.uuid4()),
255 conflict_id,
256 "stigmem:conflict:status",
257 "string",
258 "resolved",
259 "system:stigmem",
260 now,
261 None,
262 1.0,
263 fact_a["scope"],
264 hlc_status,
265 None,
266 ),
267 )
269 # 4. Update conflicts table
270 conn.execute(
271 "UPDATE conflicts SET status = 'resolved', resolution_fact_id = ? WHERE id = ?",
272 (resolution_fact_id, conflict_id),
273 )
275 return {"resolution_fact_id": resolution_fact_id, "conflict_status": "resolved"}