Coverage for node / src / stigmem_node / federation / federation_ingest.py: 93%

187 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Idempotent fact ingestion from federated peers (spec §6.3, §6.5, §19.4–19.5). 

2 

3ingest_fact() is the single entry-point for all federated facts. 

4It is safe to call multiple times for the same fact (no-op after first write). 

5 

6Phase 8 (§19): source-trust score is computed at ingest time and stored as a 

7snapshot in facts.source_trust. In trust_mode=strict, facts with t < 0.2 are 

8routed to the node's designated quarantine garden instead of the main fact table. 

9""" 

10 

11from __future__ import annotations 

12 

13import json 

14import uuid 

15from datetime import UTC, datetime 

16from typing import Any 

17 

18from ..db import db 

19from ..hlc import HLCRemoteSkewError, node_hlc 

20from ..models.facts import VALID_INTERPRET_AS 

21from ..observability.audit_event import ( 

22 INSTRUCTION_QUARANTINED, 

23 emit_instruction_event_if_applicable, 

24 is_instruction_fact, 

25) 

26from ..observability.metrics import PEER_HLC_ANOMALY 

27 

28 

29class FederationHlcSkewError(ValueError): 

30 """Inbound federated fact was rejected because its HLC wall time is implausible.""" 

31 

32 def __init__(self, fact_id: str, sender_node_id: str, cause: HLCRemoteSkewError) -> None: 

33 self.fact_id = fact_id 

34 self.sender_node_id = sender_node_id 

35 self.direction = cause.direction 

36 self.skew_ms = cause.skew_ms 

37 self.remote_wall_ms = cause.remote_wall_ms 

38 self.local_wall_ms = cause.local_wall_ms 

39 super().__init__( 

40 "remote HLC skew outside configured bound " 

41 f"(fact_id={fact_id}, sender={sender_node_id}, direction={self.direction})" 

42 ) 

43 

44 

45class FederationIntegrityError(ValueError): 

46 """Inbound federated fact failed integrity verification before ingest.""" 

47 

48 def __init__( 

49 self, 

50 *, 

51 fact_id: str, 

52 sender_node_id: str, 

53 reason: str, 

54 stored_cid: str | None = None, 

55 computed_cid: str | None = None, 

56 ) -> None: 

57 self.fact_id = fact_id 

58 self.sender_node_id = sender_node_id 

59 self.reason = reason 

60 self.stored_cid = stored_cid 

61 self.computed_cid = computed_cid 

62 super().__init__(f"inbound fact integrity verification failed: {reason}") 

63 

64 

65class FederationValidUntilExtensionError(ValueError): 

66 """Inbound federated fact tried to extend an existing valid_until.""" 

67 

68 def __init__( 

69 self, 

70 *, 

71 fact_id: str, 

72 sender_node_id: str, 

73 stored_valid_until: str | None, 

74 incoming_valid_until: str | None, 

75 ) -> None: 

76 self.fact_id = fact_id 

77 self.sender_node_id = sender_node_id 

78 self.stored_valid_until = stored_valid_until 

79 self.incoming_valid_until = incoming_valid_until 

80 super().__init__( 

81 "federation ingest rejected: incoming valid_until " 

82 f"({incoming_valid_until}) extends stored ({stored_valid_until}) " 

83 f"for fact_id={fact_id}, sender={sender_node_id} (R-18)" 

84 ) 

85 

86 

87def _encode_v(value: dict[str, Any]) -> str: 

88 vtype = value["type"] 

89 v = value.get("v") 

90 if vtype == "null": 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return "null" 

92 if vtype == "boolean": 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 return "true" if v else "false" 

94 return str(v) 

95 

96 

97def _interpret_as(fact: dict[str, Any]) -> str: 

98 interpret_as = fact.get("value", {}).get("interpret_as", "content") 

99 if interpret_as not in VALID_INTERPRET_AS: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 from fastapi import HTTPException 

101 

102 raise HTTPException(status_code=422, detail="invalid_interpret_as") 

103 return str(interpret_as) 

104 

105 

106def _verify_inbound_cid(fact: dict[str, Any], sender_node_id: str) -> str | None: 

107 from ..cid import compute_cid 

108 

109 stored_cid = fact.get("cid") 

110 if stored_cid is None: 

111 return None 

112 fact_id = str(fact.get("id", "")) 

113 value = fact.get("value", {}) 

114 computed_cid = compute_cid( 

115 entity=str(fact.get("entity", "")), 

116 relation=str(fact.get("relation", "")), 

117 value_type=str(value.get("type", "")), 

118 value_v=_encode_v(value), 

119 source=str(fact.get("source", "")), 

120 scope=str(fact.get("scope", "")), 

121 confidence=float(fact.get("confidence", 1.0)), 

122 ) 

123 if stored_cid != computed_cid: 

124 raise FederationIntegrityError( 

125 fact_id=fact_id, 

126 sender_node_id=sender_node_id, 

127 reason="cid_mismatch", 

128 stored_cid=str(stored_cid), 

129 computed_cid=computed_cid, 

130 ) 

131 return str(stored_cid) 

132 

133 

134def _resolve_quarantine_garden_id(failure_detail: str) -> str: 

135 from ..settings import settings 

136 

137 qg_id = settings.quarantine_garden_id 

138 if not qg_id: 

139 from fastapi import HTTPException 

140 

141 raise HTTPException(status_code=403, detail=failure_detail) 

142 

143 with db() as conn: 

144 qg_row = conn.execute( 

145 "SELECT id FROM gardens WHERE (id = ? OR slug = ?) AND quarantine = 1", 

146 (qg_id, qg_id), 

147 ).fetchone() 

148 if qg_row is None: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 from fastapi import HTTPException 

150 

151 raise HTTPException(status_code=403, detail=failure_detail) 

152 return str(qg_row["id"]) 

153 

154 

155def _audit_peer_hlc_anomaly( 

156 *, 

157 conn: Any, 

158 fact_id: str, 

159 sender_node_id: str, 

160 exc: HLCRemoteSkewError, 

161) -> None: 

162 from ..observability.audit_event import emit 

163 

164 PEER_HLC_ANOMALY.labels(peer_id=sender_node_id, direction=exc.direction).inc() 

165 emit( 

166 "peer_hlc_anomaly", 

167 entity_uri="system:federation", 

168 fact_id=fact_id, 

169 source=sender_node_id, 

170 detail={ 

171 "sender_node_id": sender_node_id, 

172 "direction": exc.direction, 

173 "skew_ms": exc.skew_ms, 

174 "remote_wall_ms": exc.remote_wall_ms, 

175 "local_wall_ms": exc.local_wall_ms, 

176 "max_future_skew_ms": exc.max_future_skew_ms, 

177 "max_past_skew_ms": exc.max_past_skew_ms, 

178 }, 

179 conn=conn, 

180 ) 

181 

182 

183def _audit_peer_integrity_failure( 

184 *, 

185 conn: Any, 

186 exc: FederationIntegrityError, 

187) -> None: 

188 from ..observability.audit_event import emit 

189 

190 emit( 

191 "federation_integrity_rejected", 

192 entity_uri="system:federation", 

193 fact_id=exc.fact_id, 

194 source=exc.sender_node_id, 

195 detail={ 

196 "sender_node_id": exc.sender_node_id, 

197 "reason": exc.reason, 

198 "stored_cid": exc.stored_cid, 

199 "computed_cid": exc.computed_cid, 

200 }, 

201 conn=conn, 

202 ) 

203 

204 

205def _audit_valid_until_extension( 

206 *, 

207 conn: Any, 

208 exc: FederationValidUntilExtensionError, 

209) -> None: 

210 from ..observability.audit_event import emit 

211 

212 emit( 

213 "federation_valid_until_extension_rejected", 

214 entity_uri="system:federation", 

215 fact_id=exc.fact_id, 

216 source=exc.sender_node_id, 

217 detail={ 

218 "sender_node_id": exc.sender_node_id, 

219 "stored_valid_until": exc.stored_valid_until, 

220 "incoming_valid_until": exc.incoming_valid_until, 

221 "reason": ( 

222 "R-18: federation peer attempted to extend valid_until beyond " 

223 "the locally-stored value; rejected per local recomputation " 

224 "invariant" 

225 ), 

226 }, 

227 conn=conn, 

228 ) 

229 

230 

231def _is_valid_until_extension(stored: str | None, incoming: str | None) -> bool: 

232 """Return True when incoming would extend locally observed visibility.""" 

233 

234 if stored is None: 

235 return False 

236 if incoming is None: 

237 return True 

238 

239 stored_ts = datetime.fromisoformat(stored.replace("Z", "+00:00")) 

240 incoming_ts = datetime.fromisoformat(incoming.replace("Z", "+00:00")) 

241 if stored_ts.tzinfo is None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 stored_ts = stored_ts.replace(tzinfo=UTC) 

243 if incoming_ts.tzinfo is None: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 incoming_ts = incoming_ts.replace(tzinfo=UTC) 

245 return incoming_ts > stored_ts 

246 

247 

248def ingest_fact( 

249 fact: dict[str, Any], 

250 sender_node_id: str, 

251 origin_node_id: str | None = None, 

252 origin_allowed_scopes: list[str] | None = None, 

253 *, 

254 identity_strength_boost: float | None = None, 

255) -> bool: 

256 """Idempotently ingest a federated fact. 

257 

258 Returns True if the fact was new, False if it already existed (no-op). 

259 Writes stigmem:received_from meta-fact atomically with the fact. 

260 Advances the local HLC. 

261 Detects contradictions and writes conflict entities (spec §6.5, §3.3). 

262 

263 Phase 8 (§19): computes source_trust at ingest; routes to quarantine garden 

264 when trust_mode=strict and t < 0.2, or rejects with 403 if no quarantine 

265 garden is configured. 

266 

267 origin_node_id / origin_allowed_scopes populate the v0.8 scope-propagation 

268 columns (spec §6.8.1, Migration 004). When None, defaults to sender_node_id 

269 and the fact's scope as a single-element list (first-hop inference). 

270 

271 Company-scope facts are re_federation_blocked=1 by default (spec §6.8.2): 

272 the originating node's grant is non-transitive. 

273 """ 

274 from ..settings import settings 

275 from ..source_trust import compute_source_trust 

276 

277 fact_id = fact["id"] 

278 scope = fact["scope"] 

279 source = fact["source"] 

280 try: 

281 inbound_cid = _verify_inbound_cid(fact, sender_node_id) 

282 except FederationIntegrityError as exc: 

283 with db() as conn: 

284 _audit_peer_integrity_failure(conn=conn, exc=exc) 

285 conn.commit() 

286 raise 

287 interpret_as = _interpret_as(fact) 

288 is_instruction = is_instruction_fact( 

289 fact.get("entity"), 

290 fact.get("relation"), 

291 interpret_as, 

292 ) 

293 

294 # Phase 8: compute source-trust snapshot (§19.4) 

295 trust_score: float | None = None 

296 quarantine_garden_db_id: str | None = None 

297 quarantine_status: str | None = None 

298 quarantine_reason: str | None = None 

299 

300 if is_instruction: 

301 quarantine_garden_db_id = _resolve_quarantine_garden_id("quarantine_garden_required") 

302 quarantine_status = "pending" 

303 quarantine_reason = "instruction_federation_inbound" 

304 

305 trust_mode = settings.trust_mode 

306 if trust_mode != "off": 306 ↛ 320line 306 didn't jump to line 320 because the condition on line 306 was always true

307 trust_score = compute_source_trust( 

308 source, 

309 scope, 

310 identity=None, 

311 identity_strength_override=identity_strength_boost, 

312 ) 

313 

314 if trust_mode == "strict" and trust_score < 0.2: 

315 quarantine_garden_db_id = _resolve_quarantine_garden_id("trust_below_threshold") 

316 quarantine_status = "pending" 

317 quarantine_reason = "trust_below_threshold" 

318 

319 # Scope-propagation columns (spec §6.8.1) 

320 eff_origin_node_id = origin_node_id or sender_node_id 

321 eff_origin_scopes: str | None 

322 if origin_allowed_scopes is not None: 

323 eff_origin_scopes = json.dumps(sorted(origin_allowed_scopes)) 

324 else: 

325 eff_origin_scopes = json.dumps([scope]) 

326 # company-scope facts: re-federation is blocked by default (§6.8.2) 

327 re_fed_blocked = 1 if scope == "company" else 0 

328 

329 with db() as conn: 

330 existing = conn.execute( 

331 "SELECT id, valid_until FROM facts WHERE id = ?", 

332 (fact_id,), 

333 ).fetchone() 

334 if existing is not None: 

335 stored_valid_until = existing["valid_until"] 

336 incoming_valid_until = fact.get("valid_until") 

337 if _is_valid_until_extension(stored_valid_until, incoming_valid_until): 

338 violation = FederationValidUntilExtensionError( 

339 fact_id=fact_id, 

340 sender_node_id=sender_node_id, 

341 stored_valid_until=stored_valid_until, 

342 incoming_valid_until=incoming_valid_until, 

343 ) 

344 _audit_valid_until_extension(conn=conn, exc=violation) 

345 conn.commit() 

346 raise violation 

347 return False # already ingested; silent no-op per spec §5.8 

348 

349 # Advance HLC (spec §6.3) 

350 remote_hlc = fact.get("hlc") 

351 try: 

352 new_hlc = ( 

353 node_hlc.receive( 

354 remote_hlc, 

355 max_future_skew_ms=settings.federation_hlc_max_future_skew_s * 1000, 

356 max_past_skew_ms=settings.federation_hlc_max_past_skew_s * 1000, 

357 ) 

358 if remote_hlc 

359 else node_hlc.tick() 

360 ) 

361 except HLCRemoteSkewError as exc: 

362 _audit_peer_hlc_anomaly( 

363 conn=conn, 

364 fact_id=fact_id, 

365 sender_node_id=sender_node_id, 

366 exc=exc, 

367 ) 

368 # The fact insert is rejected, but the anomaly audit event is the 

369 # security evidence. Commit it before raising so the surrounding 

370 # transaction rollback does not erase the rejection trail. 

371 conn.commit() 

372 raise FederationHlcSkewError(fact_id, sender_node_id, exc) from exc 

373 

374 # Insert the fact with received_from + scope-propagation columns (Migration 004) 

375 # Phase 8: also store source_trust snapshot + quarantine metadata 

376 conn.execute( 

377 """INSERT INTO facts 

378 (id, entity, relation, value_type, value_v, source, timestamp, 

379 valid_until, confidence, scope, hlc, received_from, 

380 origin_node_id, origin_allowed_scopes, re_federation_blocked, 

381 source_trust, quarantine_garden_id, quarantine_status, 

382 quarantine_reason, interpret_as, cid) 

383 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", 

384 ( 

385 fact_id, 

386 fact["entity"], 

387 fact["relation"], 

388 fact["value"]["type"], 

389 _encode_v(fact["value"]), 

390 source, 

391 fact["timestamp"], 

392 fact.get("valid_until"), 

393 fact["confidence"], 

394 scope, 

395 new_hlc, 

396 sender_node_id, 

397 eff_origin_node_id, 

398 eff_origin_scopes, 

399 re_fed_blocked, 

400 trust_score, 

401 quarantine_garden_db_id, 

402 quarantine_status, 

403 quarantine_reason, 

404 interpret_as, 

405 inbound_cid, 

406 ), 

407 ) 

408 

409 # Phase 8 §19.5.4: audit entry for ingest-time quarantine routing 

410 if quarantine_status == "pending": 

411 audit_id = str(uuid.uuid4()) 

412 audit_now = datetime.now(UTC).isoformat() 

413 conn.execute( 

414 """INSERT INTO fact_audit_log 

415 (id, fact_id, event_type, entity_uri, oidc_sub, source, 

416 attested_key_id, detail, ts) 

417 VALUES (?,?,?,?,?,?,?,?,?)""", 

418 ( 

419 audit_id, 

420 fact_id, 

421 "quarantine_ingest", 

422 "system:federation", 

423 None, 

424 sender_node_id, 

425 None, 

426 json.dumps( 

427 { 

428 "reason": quarantine_reason, 

429 "trust_score": trust_score, 

430 "interpret_as": interpret_as, 

431 } 

432 ), 

433 audit_now, 

434 ), 

435 ) 

436 emit_instruction_event_if_applicable( 

437 INSTRUCTION_QUARANTINED, 

438 fact_id=fact_id, 

439 fact_entity=fact.get("entity"), 

440 fact_relation=fact.get("relation"), 

441 fact_interpret_as=interpret_as, 

442 actor_uri="system:federation", 

443 source=sender_node_id, 

444 detail={ 

445 "reason": quarantine_reason, 

446 "trust_score": trust_score, 

447 "received_from": sender_node_id, 

448 }, 

449 conn=conn, 

450 ) 

451 

452 # Write stigmem:received_from meta-fact atomically (spec §3.1) 

453 meta_id = str(uuid.uuid4()) 

454 meta_now = datetime.now(UTC).isoformat() 

455 meta_hlc = node_hlc.tick() 

456 conn.execute( 

457 """INSERT INTO facts 

458 (id, entity, relation, value_type, value_v, source, timestamp, 

459 valid_until, confidence, scope, hlc, received_from) 

460 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

461 ( 

462 meta_id, 

463 fact_id, # entity = the ingested fact's ID 

464 "stigmem:received_from", 

465 "ref", 

466 sender_node_id, 

467 "system:stigmem", 

468 meta_now, 

469 None, 

470 1.0, 

471 "local", # meta-facts are local; MUST NOT be re-replicated (spec §3.1) 

472 meta_hlc, 

473 None, 

474 ), 

475 ) 

476 

477 # Contradiction detection — skip for quarantined facts (§19.5.2) 

478 if quarantine_status is None: 

479 _detect_and_record_contradiction(conn, fact, fact_id) 

480 

481 return True 

482 

483 

484_STIGMEM_NS = "stigmem:" 

485_STIGMEM_URI_NS = "stigmem://" 

486 

487 

488def _is_reserved_stigmem(s: str) -> bool: 

489 """True for bare stigmem: system names (e.g. 'stigmem:conflict:x'). 

490 False for stigmem:// URI entities which are user content.""" 

491 return s.startswith(_STIGMEM_NS) and not s.startswith(_STIGMEM_URI_NS) 

492 

493 

494def _detect_and_record_contradiction( 

495 conn: Any, 

496 fact: dict[str, Any], 

497 fact_id: str, 

498) -> None: 

499 """If a contradiction exists, assert conflict entities and write conflicts table.""" 

500 # Reserved stigmem: facts are system state (status transitions, meta-facts), not 

501 # semantic content. Two stigmem:conflict:status facts with different values represent 

502 # a state transition, not a contradiction — exempt them from sibling-detection (§9.1). 

503 # Note: stigmem:// URI entities are user content and ARE subject to detection. 

504 if _is_reserved_stigmem(fact["entity"]) or _is_reserved_stigmem(fact["relation"]): 

505 return 

506 

507 siblings = conn.execute( 

508 """SELECT id FROM facts 

509 WHERE entity = ? AND relation = ? AND scope = ? 

510 AND id != ? AND confidence > 0.0""", 

511 (fact["entity"], fact["relation"], fact["scope"], fact_id), 

512 ).fetchall() 

513 

514 if not siblings: 

515 return 

516 

517 now = datetime.now(UTC).isoformat() 

518 

519 for sibling in siblings: 

520 sibling_id = sibling["id"] 

521 conflict_uuid = str(uuid.uuid4()) 

522 conflict_id = f"stigmem:conflict:{conflict_uuid}" 

523 

524 # Skip if this pair already has a conflict record 

525 already = conn.execute( 

526 """SELECT c.id FROM conflicts c 

527 WHERE (c.fact_a_id = ? AND c.fact_b_id = ?) 

528 OR (c.fact_a_id = ? AND c.fact_b_id = ?)""", 

529 (fact_id, sibling_id, sibling_id, fact_id), 

530 ).fetchone() 

531 if already: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 continue 

533 

534 hlc_between = node_hlc.tick() 

535 conn.execute( 

536 """INSERT INTO facts 

537 (id, entity, relation, value_type, value_v, source, timestamp, 

538 valid_until, confidence, scope, hlc, received_from) 

539 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

540 ( 

541 str(uuid.uuid4()), 

542 conflict_id, 

543 "stigmem:conflict:between", 

544 "text", 

545 f"{fact_id} {sibling_id}", 

546 "system:stigmem", 

547 now, 

548 None, 

549 1.0, 

550 fact["scope"], 

551 hlc_between, 

552 None, 

553 ), 

554 ) 

555 

556 hlc_status = node_hlc.tick() 

557 conn.execute( 

558 """INSERT INTO facts 

559 (id, entity, relation, value_type, value_v, source, timestamp, 

560 valid_until, confidence, scope, hlc, received_from) 

561 VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", 

562 ( 

563 str(uuid.uuid4()), 

564 conflict_id, 

565 "stigmem:conflict:status", 

566 "string", 

567 "unresolved", 

568 "system:stigmem", 

569 now, 

570 None, 

571 1.0, 

572 fact["scope"], 

573 hlc_status, 

574 None, 

575 ), 

576 ) 

577 

578 conn.execute( 

579 """INSERT OR IGNORE INTO conflicts (id, fact_a_id, fact_b_id, status, detected_at) 

580 VALUES (?,?,?,?,?)""", 

581 (conflict_id, fact_id, sibling_id, "unresolved", now), 

582 ) 

583 

584 

585def write_audit_log( 

586 peer_id: str, 

587 event_type: str, 

588 detail: dict[str, Any] | None = None, 

589) -> None: 

590 """Write a federation audit log entry (spec §6.4).""" 

591 entry_id = str(uuid.uuid4()) 

592 now = datetime.now(UTC).isoformat() 

593 with db() as conn: 

594 conn.execute( 

595 "INSERT INTO federation_audit (id, peer_id, event_type, detail, ts) VALUES (?,?,?,?,?)", 

596 (entry_id, peer_id, event_type, json.dumps(detail) if detail else None, now), 

597 )