Coverage for node / src / stigmem_node / federation / federation_ingest.py: 93%
187 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Idempotent fact ingestion from federated peers (spec §6.3, §6.5, §19.4–19.5).
3ingest_fact() is the single entry-point for all federated facts.
4It is safe to call multiple times for the same fact (no-op after first write).
6Phase 8 (§19): source-trust score is computed at ingest time and stored as a
7snapshot in facts.source_trust. In trust_mode=strict, facts with t < 0.2 are
8routed to the node's designated quarantine garden instead of the main fact table.
9"""
11from __future__ import annotations
13import json
14import uuid
15from datetime import UTC, datetime
16from typing import Any
18from ..db import db
19from ..hlc import HLCRemoteSkewError, node_hlc
20from ..models.facts import VALID_INTERPRET_AS
21from ..observability.audit_event import (
22 INSTRUCTION_QUARANTINED,
23 emit_instruction_event_if_applicable,
24 is_instruction_fact,
25)
26from ..observability.metrics import PEER_HLC_ANOMALY
29class FederationHlcSkewError(ValueError):
30 """Inbound federated fact was rejected because its HLC wall time is implausible."""
32 def __init__(self, fact_id: str, sender_node_id: str, cause: HLCRemoteSkewError) -> None:
33 self.fact_id = fact_id
34 self.sender_node_id = sender_node_id
35 self.direction = cause.direction
36 self.skew_ms = cause.skew_ms
37 self.remote_wall_ms = cause.remote_wall_ms
38 self.local_wall_ms = cause.local_wall_ms
39 super().__init__(
40 "remote HLC skew outside configured bound "
41 f"(fact_id={fact_id}, sender={sender_node_id}, direction={self.direction})"
42 )
45class FederationIntegrityError(ValueError):
46 """Inbound federated fact failed integrity verification before ingest."""
48 def __init__(
49 self,
50 *,
51 fact_id: str,
52 sender_node_id: str,
53 reason: str,
54 stored_cid: str | None = None,
55 computed_cid: str | None = None,
56 ) -> None:
57 self.fact_id = fact_id
58 self.sender_node_id = sender_node_id
59 self.reason = reason
60 self.stored_cid = stored_cid
61 self.computed_cid = computed_cid
62 super().__init__(f"inbound fact integrity verification failed: {reason}")
65class FederationValidUntilExtensionError(ValueError):
66 """Inbound federated fact tried to extend an existing valid_until."""
68 def __init__(
69 self,
70 *,
71 fact_id: str,
72 sender_node_id: str,
73 stored_valid_until: str | None,
74 incoming_valid_until: str | None,
75 ) -> None:
76 self.fact_id = fact_id
77 self.sender_node_id = sender_node_id
78 self.stored_valid_until = stored_valid_until
79 self.incoming_valid_until = incoming_valid_until
80 super().__init__(
81 "federation ingest rejected: incoming valid_until "
82 f"({incoming_valid_until}) extends stored ({stored_valid_until}) "
83 f"for fact_id={fact_id}, sender={sender_node_id} (R-18)"
84 )
87def _encode_v(value: dict[str, Any]) -> str:
88 vtype = value["type"]
89 v = value.get("v")
90 if vtype == "null": 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return "null"
92 if vtype == "boolean": 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 return "true" if v else "false"
94 return str(v)
97def _interpret_as(fact: dict[str, Any]) -> str:
98 interpret_as = fact.get("value", {}).get("interpret_as", "content")
99 if interpret_as not in VALID_INTERPRET_AS: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 from fastapi import HTTPException
102 raise HTTPException(status_code=422, detail="invalid_interpret_as")
103 return str(interpret_as)
106def _verify_inbound_cid(fact: dict[str, Any], sender_node_id: str) -> str | None:
107 from ..cid import compute_cid
109 stored_cid = fact.get("cid")
110 if stored_cid is None:
111 return None
112 fact_id = str(fact.get("id", ""))
113 value = fact.get("value", {})
114 computed_cid = compute_cid(
115 entity=str(fact.get("entity", "")),
116 relation=str(fact.get("relation", "")),
117 value_type=str(value.get("type", "")),
118 value_v=_encode_v(value),
119 source=str(fact.get("source", "")),
120 scope=str(fact.get("scope", "")),
121 confidence=float(fact.get("confidence", 1.0)),
122 )
123 if stored_cid != computed_cid:
124 raise FederationIntegrityError(
125 fact_id=fact_id,
126 sender_node_id=sender_node_id,
127 reason="cid_mismatch",
128 stored_cid=str(stored_cid),
129 computed_cid=computed_cid,
130 )
131 return str(stored_cid)
134def _resolve_quarantine_garden_id(failure_detail: str) -> str:
135 from ..settings import settings
137 qg_id = settings.quarantine_garden_id
138 if not qg_id:
139 from fastapi import HTTPException
141 raise HTTPException(status_code=403, detail=failure_detail)
143 with db() as conn:
144 qg_row = conn.execute(
145 "SELECT id FROM gardens WHERE (id = ? OR slug = ?) AND quarantine = 1",
146 (qg_id, qg_id),
147 ).fetchone()
148 if qg_row is None: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 from fastapi import HTTPException
151 raise HTTPException(status_code=403, detail=failure_detail)
152 return str(qg_row["id"])
155def _audit_peer_hlc_anomaly(
156 *,
157 conn: Any,
158 fact_id: str,
159 sender_node_id: str,
160 exc: HLCRemoteSkewError,
161) -> None:
162 from ..observability.audit_event import emit
164 PEER_HLC_ANOMALY.labels(peer_id=sender_node_id, direction=exc.direction).inc()
165 emit(
166 "peer_hlc_anomaly",
167 entity_uri="system:federation",
168 fact_id=fact_id,
169 source=sender_node_id,
170 detail={
171 "sender_node_id": sender_node_id,
172 "direction": exc.direction,
173 "skew_ms": exc.skew_ms,
174 "remote_wall_ms": exc.remote_wall_ms,
175 "local_wall_ms": exc.local_wall_ms,
176 "max_future_skew_ms": exc.max_future_skew_ms,
177 "max_past_skew_ms": exc.max_past_skew_ms,
178 },
179 conn=conn,
180 )
183def _audit_peer_integrity_failure(
184 *,
185 conn: Any,
186 exc: FederationIntegrityError,
187) -> None:
188 from ..observability.audit_event import emit
190 emit(
191 "federation_integrity_rejected",
192 entity_uri="system:federation",
193 fact_id=exc.fact_id,
194 source=exc.sender_node_id,
195 detail={
196 "sender_node_id": exc.sender_node_id,
197 "reason": exc.reason,
198 "stored_cid": exc.stored_cid,
199 "computed_cid": exc.computed_cid,
200 },
201 conn=conn,
202 )
205def _audit_valid_until_extension(
206 *,
207 conn: Any,
208 exc: FederationValidUntilExtensionError,
209) -> None:
210 from ..observability.audit_event import emit
212 emit(
213 "federation_valid_until_extension_rejected",
214 entity_uri="system:federation",
215 fact_id=exc.fact_id,
216 source=exc.sender_node_id,
217 detail={
218 "sender_node_id": exc.sender_node_id,
219 "stored_valid_until": exc.stored_valid_until,
220 "incoming_valid_until": exc.incoming_valid_until,
221 "reason": (
222 "R-18: federation peer attempted to extend valid_until beyond "
223 "the locally-stored value; rejected per local recomputation "
224 "invariant"
225 ),
226 },
227 conn=conn,
228 )
231def _is_valid_until_extension(stored: str | None, incoming: str | None) -> bool:
232 """Return True when incoming would extend locally observed visibility."""
234 if stored is None:
235 return False
236 if incoming is None:
237 return True
239 stored_ts = datetime.fromisoformat(stored.replace("Z", "+00:00"))
240 incoming_ts = datetime.fromisoformat(incoming.replace("Z", "+00:00"))
241 if stored_ts.tzinfo is None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 stored_ts = stored_ts.replace(tzinfo=UTC)
243 if incoming_ts.tzinfo is None: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 incoming_ts = incoming_ts.replace(tzinfo=UTC)
245 return incoming_ts > stored_ts
248def ingest_fact(
249 fact: dict[str, Any],
250 sender_node_id: str,
251 origin_node_id: str | None = None,
252 origin_allowed_scopes: list[str] | None = None,
253 *,
254 identity_strength_boost: float | None = None,
255) -> bool:
256 """Idempotently ingest a federated fact.
258 Returns True if the fact was new, False if it already existed (no-op).
259 Writes stigmem:received_from meta-fact atomically with the fact.
260 Advances the local HLC.
261 Detects contradictions and writes conflict entities (spec §6.5, §3.3).
263 Phase 8 (§19): computes source_trust at ingest; routes to quarantine garden
264 when trust_mode=strict and t < 0.2, or rejects with 403 if no quarantine
265 garden is configured.
267 origin_node_id / origin_allowed_scopes populate the v0.8 scope-propagation
268 columns (spec §6.8.1, Migration 004). When None, defaults to sender_node_id
269 and the fact's scope as a single-element list (first-hop inference).
271 Company-scope facts are re_federation_blocked=1 by default (spec §6.8.2):
272 the originating node's grant is non-transitive.
273 """
274 from ..settings import settings
275 from ..source_trust import compute_source_trust
277 fact_id = fact["id"]
278 scope = fact["scope"]
279 source = fact["source"]
280 try:
281 inbound_cid = _verify_inbound_cid(fact, sender_node_id)
282 except FederationIntegrityError as exc:
283 with db() as conn:
284 _audit_peer_integrity_failure(conn=conn, exc=exc)
285 conn.commit()
286 raise
287 interpret_as = _interpret_as(fact)
288 is_instruction = is_instruction_fact(
289 fact.get("entity"),
290 fact.get("relation"),
291 interpret_as,
292 )
294 # Phase 8: compute source-trust snapshot (§19.4)
295 trust_score: float | None = None
296 quarantine_garden_db_id: str | None = None
297 quarantine_status: str | None = None
298 quarantine_reason: str | None = None
300 if is_instruction:
301 quarantine_garden_db_id = _resolve_quarantine_garden_id("quarantine_garden_required")
302 quarantine_status = "pending"
303 quarantine_reason = "instruction_federation_inbound"
305 trust_mode = settings.trust_mode
306 if trust_mode != "off": 306 ↛ 320line 306 didn't jump to line 320 because the condition on line 306 was always true
307 trust_score = compute_source_trust(
308 source,
309 scope,
310 identity=None,
311 identity_strength_override=identity_strength_boost,
312 )
314 if trust_mode == "strict" and trust_score < 0.2:
315 quarantine_garden_db_id = _resolve_quarantine_garden_id("trust_below_threshold")
316 quarantine_status = "pending"
317 quarantine_reason = "trust_below_threshold"
319 # Scope-propagation columns (spec §6.8.1)
320 eff_origin_node_id = origin_node_id or sender_node_id
321 eff_origin_scopes: str | None
322 if origin_allowed_scopes is not None:
323 eff_origin_scopes = json.dumps(sorted(origin_allowed_scopes))
324 else:
325 eff_origin_scopes = json.dumps([scope])
326 # company-scope facts: re-federation is blocked by default (§6.8.2)
327 re_fed_blocked = 1 if scope == "company" else 0
329 with db() as conn:
330 existing = conn.execute(
331 "SELECT id, valid_until FROM facts WHERE id = ?",
332 (fact_id,),
333 ).fetchone()
334 if existing is not None:
335 stored_valid_until = existing["valid_until"]
336 incoming_valid_until = fact.get("valid_until")
337 if _is_valid_until_extension(stored_valid_until, incoming_valid_until):
338 violation = FederationValidUntilExtensionError(
339 fact_id=fact_id,
340 sender_node_id=sender_node_id,
341 stored_valid_until=stored_valid_until,
342 incoming_valid_until=incoming_valid_until,
343 )
344 _audit_valid_until_extension(conn=conn, exc=violation)
345 conn.commit()
346 raise violation
347 return False # already ingested; silent no-op per spec §5.8
349 # Advance HLC (spec §6.3)
350 remote_hlc = fact.get("hlc")
351 try:
352 new_hlc = (
353 node_hlc.receive(
354 remote_hlc,
355 max_future_skew_ms=settings.federation_hlc_max_future_skew_s * 1000,
356 max_past_skew_ms=settings.federation_hlc_max_past_skew_s * 1000,
357 )
358 if remote_hlc
359 else node_hlc.tick()
360 )
361 except HLCRemoteSkewError as exc:
362 _audit_peer_hlc_anomaly(
363 conn=conn,
364 fact_id=fact_id,
365 sender_node_id=sender_node_id,
366 exc=exc,
367 )
368 # The fact insert is rejected, but the anomaly audit event is the
369 # security evidence. Commit it before raising so the surrounding
370 # transaction rollback does not erase the rejection trail.
371 conn.commit()
372 raise FederationHlcSkewError(fact_id, sender_node_id, exc) from exc
374 # Insert the fact with received_from + scope-propagation columns (Migration 004)
375 # Phase 8: also store source_trust snapshot + quarantine metadata
376 conn.execute(
377 """INSERT INTO facts
378 (id, entity, relation, value_type, value_v, source, timestamp,
379 valid_until, confidence, scope, hlc, received_from,
380 origin_node_id, origin_allowed_scopes, re_federation_blocked,
381 source_trust, quarantine_garden_id, quarantine_status,
382 quarantine_reason, interpret_as, cid)
383 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
384 (
385 fact_id,
386 fact["entity"],
387 fact["relation"],
388 fact["value"]["type"],
389 _encode_v(fact["value"]),
390 source,
391 fact["timestamp"],
392 fact.get("valid_until"),
393 fact["confidence"],
394 scope,
395 new_hlc,
396 sender_node_id,
397 eff_origin_node_id,
398 eff_origin_scopes,
399 re_fed_blocked,
400 trust_score,
401 quarantine_garden_db_id,
402 quarantine_status,
403 quarantine_reason,
404 interpret_as,
405 inbound_cid,
406 ),
407 )
409 # Phase 8 §19.5.4: audit entry for ingest-time quarantine routing
410 if quarantine_status == "pending":
411 audit_id = str(uuid.uuid4())
412 audit_now = datetime.now(UTC).isoformat()
413 conn.execute(
414 """INSERT INTO fact_audit_log
415 (id, fact_id, event_type, entity_uri, oidc_sub, source,
416 attested_key_id, detail, ts)
417 VALUES (?,?,?,?,?,?,?,?,?)""",
418 (
419 audit_id,
420 fact_id,
421 "quarantine_ingest",
422 "system:federation",
423 None,
424 sender_node_id,
425 None,
426 json.dumps(
427 {
428 "reason": quarantine_reason,
429 "trust_score": trust_score,
430 "interpret_as": interpret_as,
431 }
432 ),
433 audit_now,
434 ),
435 )
436 emit_instruction_event_if_applicable(
437 INSTRUCTION_QUARANTINED,
438 fact_id=fact_id,
439 fact_entity=fact.get("entity"),
440 fact_relation=fact.get("relation"),
441 fact_interpret_as=interpret_as,
442 actor_uri="system:federation",
443 source=sender_node_id,
444 detail={
445 "reason": quarantine_reason,
446 "trust_score": trust_score,
447 "received_from": sender_node_id,
448 },
449 conn=conn,
450 )
452 # Write stigmem:received_from meta-fact atomically (spec §3.1)
453 meta_id = str(uuid.uuid4())
454 meta_now = datetime.now(UTC).isoformat()
455 meta_hlc = node_hlc.tick()
456 conn.execute(
457 """INSERT INTO facts
458 (id, entity, relation, value_type, value_v, source, timestamp,
459 valid_until, confidence, scope, hlc, received_from)
460 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
461 (
462 meta_id,
463 fact_id, # entity = the ingested fact's ID
464 "stigmem:received_from",
465 "ref",
466 sender_node_id,
467 "system:stigmem",
468 meta_now,
469 None,
470 1.0,
471 "local", # meta-facts are local; MUST NOT be re-replicated (spec §3.1)
472 meta_hlc,
473 None,
474 ),
475 )
477 # Contradiction detection — skip for quarantined facts (§19.5.2)
478 if quarantine_status is None:
479 _detect_and_record_contradiction(conn, fact, fact_id)
481 return True
484_STIGMEM_NS = "stigmem:"
485_STIGMEM_URI_NS = "stigmem://"
488def _is_reserved_stigmem(s: str) -> bool:
489 """True for bare stigmem: system names (e.g. 'stigmem:conflict:x').
490 False for stigmem:// URI entities which are user content."""
491 return s.startswith(_STIGMEM_NS) and not s.startswith(_STIGMEM_URI_NS)
494def _detect_and_record_contradiction(
495 conn: Any,
496 fact: dict[str, Any],
497 fact_id: str,
498) -> None:
499 """If a contradiction exists, assert conflict entities and write conflicts table."""
500 # Reserved stigmem: facts are system state (status transitions, meta-facts), not
501 # semantic content. Two stigmem:conflict:status facts with different values represent
502 # a state transition, not a contradiction — exempt them from sibling-detection (§9.1).
503 # Note: stigmem:// URI entities are user content and ARE subject to detection.
504 if _is_reserved_stigmem(fact["entity"]) or _is_reserved_stigmem(fact["relation"]):
505 return
507 siblings = conn.execute(
508 """SELECT id FROM facts
509 WHERE entity = ? AND relation = ? AND scope = ?
510 AND id != ? AND confidence > 0.0""",
511 (fact["entity"], fact["relation"], fact["scope"], fact_id),
512 ).fetchall()
514 if not siblings:
515 return
517 now = datetime.now(UTC).isoformat()
519 for sibling in siblings:
520 sibling_id = sibling["id"]
521 conflict_uuid = str(uuid.uuid4())
522 conflict_id = f"stigmem:conflict:{conflict_uuid}"
524 # Skip if this pair already has a conflict record
525 already = conn.execute(
526 """SELECT c.id FROM conflicts c
527 WHERE (c.fact_a_id = ? AND c.fact_b_id = ?)
528 OR (c.fact_a_id = ? AND c.fact_b_id = ?)""",
529 (fact_id, sibling_id, sibling_id, fact_id),
530 ).fetchone()
531 if already: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 continue
534 hlc_between = node_hlc.tick()
535 conn.execute(
536 """INSERT INTO facts
537 (id, entity, relation, value_type, value_v, source, timestamp,
538 valid_until, confidence, scope, hlc, received_from)
539 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
540 (
541 str(uuid.uuid4()),
542 conflict_id,
543 "stigmem:conflict:between",
544 "text",
545 f"{fact_id} {sibling_id}",
546 "system:stigmem",
547 now,
548 None,
549 1.0,
550 fact["scope"],
551 hlc_between,
552 None,
553 ),
554 )
556 hlc_status = node_hlc.tick()
557 conn.execute(
558 """INSERT INTO facts
559 (id, entity, relation, value_type, value_v, source, timestamp,
560 valid_until, confidence, scope, hlc, received_from)
561 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
562 (
563 str(uuid.uuid4()),
564 conflict_id,
565 "stigmem:conflict:status",
566 "string",
567 "unresolved",
568 "system:stigmem",
569 now,
570 None,
571 1.0,
572 fact["scope"],
573 hlc_status,
574 None,
575 ),
576 )
578 conn.execute(
579 """INSERT OR IGNORE INTO conflicts (id, fact_a_id, fact_b_id, status, detected_at)
580 VALUES (?,?,?,?,?)""",
581 (conflict_id, fact_id, sibling_id, "unresolved", now),
582 )
585def write_audit_log(
586 peer_id: str,
587 event_type: str,
588 detail: dict[str, Any] | None = None,
589) -> None:
590 """Write a federation audit log entry (spec §6.4)."""
591 entry_id = str(uuid.uuid4())
592 now = datetime.now(UTC).isoformat()
593 with db() as conn:
594 conn.execute(
595 "INSERT INTO federation_audit (id, peer_id, event_type, detail, ts) VALUES (?,?,?,?,?)",
596 (entry_id, peer_id, event_type, json.dumps(detail) if detail else None, now),
597 )