Coverage for node / src / stigmem_node / routes / federation / replication.py: 86%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation fact pull and push routes.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from typing import Annotated, Any 

7 

8from fastapi import Header, HTTPException, Query, Request 

9 

10from ...db import db 

11from ...federation.federation_ingest import FederationHlcSkewError, FederationIntegrityError 

12from ...federation.tls import check_peer_san 

13from ...identity.capability import CapabilityTokenError, verify_token 

14from ...identity.trust_store import get_peer_manifest 

15from ...metrics import FEDERATION_EGRESS 

16from ...models.facts import row_to_record 

17from ...models.federation import FederationFactsResponse 

18from ...plugins import Deny, TenantContext, get_registry 

19from .common import ( 

20 PeerTokenDep, 

21 _allowed_output_scopes, 

22 _cap_token_covers_scope, 

23 _get_mtls_peer_cert, 

24 _public_module, 

25 _try_peer_token_auth, 

26 logger, 

27 router, 

28) 

29 

30 

31@router.get("/v1/federation/facts", response_model=FederationFactsResponse) 

32def pull_facts( 

33 peer_and_token: PeerTokenDep, 

34 scope: str | None = Query(None), 

35 cursor: str | None = Query(None), 

36 limit: int = Query(100, ge=1, le=500), 

37) -> FederationFactsResponse: 

38 """Return scope-filtered, HLC-cursor-paged facts to an authenticated peer. 

39 

40 Covered by Spec-05-Federation-Trust. 

41 """ 

42 peer, token_payload = peer_and_token 

43 

44 permitted = _allowed_output_scopes(peer, token_payload) 

45 if not permitted: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 raise HTTPException(status_code=403, detail="no permitted scopes") 

47 

48 if scope is not None: 

49 if scope not in permitted: 49 ↛ 56line 49 didn't jump to line 56 because the condition on line 49 was always true

50 _public_module().write_audit_log( 

51 peer["id"], 

52 "scope_violation", 

53 {"requested_scope": scope, "permitted": list(permitted)}, 

54 ) 

55 raise HTTPException(status_code=403, detail="scope not permitted for this peer") 

56 query_scopes = {scope} 

57 else: 

58 query_scopes = permitted 

59 

60 scope_placeholders = ",".join("?" * len(query_scopes)) 

61 params: list[Any] = list(query_scopes) 

62 conditions: list[str] = [ 

63 f"scope IN ({scope_placeholders})", 

64 "tenant_id = ?", 

65 "hlc IS NOT NULL", # only facts with an HLC are replication-eligible 

66 "received_from IS NULL", # do not re-federate inbound facts (§3.1) 

67 "entity NOT LIKE 'stigmem:conflict:%'", # conflict entities are local (§6.5) 

68 "relation NOT LIKE 'stigmem:%'", # meta-facts (received_from, ttl) are local 

69 "re_federation_blocked = 0", # exclude company-scope relay-blocked facts (§6.8.2) 

70 "(derived_from IS NULL OR derived_from = '' OR derived_from = '[]')", 

71 ] 

72 params.append("default") 

73 if cursor: 

74 conditions.append("hlc > ?") 

75 params.append(cursor) 

76 

77 where = " AND ".join(conditions) 

78 params.append(limit + 1) 

79 

80 with db() as conn: 

81 rows = conn.execute( 

82 f"SELECT * FROM facts WHERE {where} ORDER BY hlc ASC LIMIT ?", # noqa: S608 # nosec B608 — where built from literal fragments; values in params 

83 params, 

84 ).fetchall() 

85 

86 has_more = len(rows) > limit 

87 rows = rows[:limit] 

88 

89 seen: dict[tuple[str, str, str], int] = {} 

90 for r in rows: 

91 k = (r["entity"], r["relation"], r["scope"]) 

92 seen[k] = seen.get(k, 0) + 1 

93 

94 records = [ 

95 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1) 

96 for r in rows 

97 ] 

98 tenant = TenantContext( 

99 tenant_id="default", 

100 metadata={"tenant_context_source": "pinned"}, 

101 ) 

102 registry = get_registry() 

103 records = registry.fire_filter_chain( 

104 "federation_outbound_filter", 

105 records, 

106 peer=peer, 

107 token_payload=token_payload, 

108 tenant=tenant, 

109 ) 

110 records = registry.fire_filter_chain( 

111 "federation_outbound_sign", 

112 records, 

113 peer=peer, 

114 token_payload=token_payload, 

115 tenant=tenant, 

116 ) 

117 

118 new_cursor: str | None = rows[-1]["hlc"] if rows else cursor 

119 FEDERATION_EGRESS.labels(peer_id=peer["node_id"], status="ok").inc(len(records)) 

120 return FederationFactsResponse(facts=records, cursor=new_cursor, has_more=has_more) 

121 

122 

123# --------------------------------------------------------------------------- 

124# POST /v1/federation/facts/push — optional push (§5.11) 

125# --------------------------------------------------------------------------- 

126 

127 

128def _verify_push_cap_token(x_stigmem_capability: str) -> dict[str, Any]: 

129 """Verify a capability-token header for the push path (H-SEC-2). 

130 

131 On verification failure logs ``capability_rejected`` and raises 401. 

132 On success returns the decoded token dict and logs ``capability_verified``. 

133 """ 

134 try: 

135 verify_token( 

136 x_stigmem_capability, 

137 lambda uri: get_peer_manifest( 

138 uri, refresh_if_expired=True, trust_mode=_public_module().settings.trust_mode 

139 ), 

140 trust_mode=_public_module().settings.trust_mode, 

141 ) 

142 except CapabilityTokenError as exc: 

143 # M-SEC-4: log capability_rejected 

144 import uuid as _uuid 

145 from datetime import UTC as _UTC 

146 from datetime import datetime as _datetime 

147 

148 _now = _datetime.now(_UTC).isoformat() 

149 try: 

150 import json as _json 

151 

152 with db() as conn: 

153 conn.execute( 

154 """INSERT INTO fact_audit_log 

155 (id, fact_id, event_type, entity_uri, oidc_sub, source, 

156 attested_key_id, detail, ts) 

157 VALUES (?,?,?,?,?,?,?,?,?)""", 

158 ( 

159 str(_uuid.uuid4()), 

160 "capability:rejected", 

161 "capability_rejected", 

162 None, 

163 None, 

164 "system:capability", 

165 None, 

166 _json.dumps({"reason": str(exc)}), 

167 _now, 

168 ), 

169 ) 

170 except Exception as audit_exc: # nosec B110 — audit log best-effort 

171 logger.debug("capability_rejected audit log failed: %s", audit_exc) 

172 raise HTTPException(status_code=401, detail=f"capability token invalid: {exc}") from exc 

173 

174 try: 

175 cap_token: dict[str, Any] = json.loads(x_stigmem_capability) 

176 except json.JSONDecodeError as exc: 

177 raise HTTPException( 

178 status_code=400, detail=f"malformed capability token JSON: {exc}" 

179 ) from exc 

180 

181 if cap_token.get("verb") != "write": 

182 raise HTTPException( 

183 status_code=403, 

184 detail="insufficient_capability: token verb must be 'write' for push", 

185 ) 

186 

187 # M-SEC-4: log capability_verified 

188 import uuid as _uuid2 

189 from datetime import UTC as _UTC2 

190 from datetime import datetime as _datetime2 

191 

192 _now2 = _datetime2.now(_UTC2).isoformat() 

193 try: 

194 with db() as conn: 

195 conn.execute( 

196 """INSERT INTO fact_audit_log 

197 (id, fact_id, event_type, entity_uri, oidc_sub, source, 

198 attested_key_id, detail, ts) 

199 VALUES (?,?,?,?,?,?,?,?,?)""", 

200 ( 

201 str(_uuid2.uuid4()), 

202 cap_token.get("token_id", "unknown"), 

203 "capability_verified", 

204 cap_token.get("subject"), 

205 None, 

206 "system:capability", 

207 None, 

208 json.dumps( 

209 { 

210 "token_id": cap_token.get("token_id"), 

211 "issuer": cap_token.get("issuer"), 

212 "verb": cap_token.get("verb"), 

213 "object": cap_token.get("object"), 

214 } 

215 ), 

216 _now2, 

217 ), 

218 ) 

219 except Exception as audit_exc: # nosec B110 — audit log best-effort 

220 logger.debug("capability_verified audit log failed: %s", audit_exc) 

221 

222 return cap_token 

223 

224 

225@router.post("/v1/federation/facts/push", status_code=202) 

226def push_facts( 

227 request: Request, 

228 body: dict[str, Any], 

229 authorization: Annotated[str | None, Header(alias="authorization")] = None, 

230 x_stigmem_capability: Annotated[str | None, Header(alias="x-stigmem-capability")] = None, 

231) -> dict[str, Any]: 

232 """Receive push-replicated facts from a peer. Off by default. 

233 

234 Auth (H-SEC-2): peer JWT first; if that fails and X-Stigmem-Capability is 

235 present, fall through to capability-token verification. Capability tokens 

236 must carry verb=write and an object that covers all pushed fact scopes. 

237 Covered by Spec-05-Federation-Trust. 

238 """ 

239 if not _public_module().settings.federation_push_enabled: 

240 raise HTTPException(status_code=405, detail="push replication not enabled on this node") 

241 

242 # --- Phase 1: try peer JWT auth --- 

243 peer_auth = _try_peer_token_auth(authorization) 

244 

245 peer: dict[str, Any] | None = None 

246 token_payload: dict[str, Any] | None = None 

247 cap_token: dict[str, Any] | None = None 

248 using_cap_token = False 

249 

250 if peer_auth is not None: 

251 peer, token_payload = peer_auth 

252 # §22.1.2.4 — enforce SAN on the push path too 

253 if _public_module().settings.mtls_enabled: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 peer_cert = _get_mtls_peer_cert(request) 

255 if not check_peer_san(peer_cert, peer["node_id"]): 

256 _public_module().write_audit_log( 

257 peer["id"], "san_mismatch", {"node_id": peer["node_id"]} 

258 ) 

259 raise HTTPException( 

260 status_code=401, 

261 detail="peer certificate URI SAN does not match node_id", 

262 ) 

263 elif x_stigmem_capability is not None: 

264 cap_token = _verify_push_cap_token(x_stigmem_capability) 

265 using_cap_token = True 

266 else: 

267 raise HTTPException( 

268 status_code=401, 

269 detail="peer token or X-Stigmem-Capability header required", 

270 ) 

271 

272 facts = body.get("facts", []) 

273 accepted = 0 

274 rejected = 0 

275 errors: list[dict[str, Any]] = [] 

276 

277 for fact in facts: 

278 fact_scope = fact.get("scope", "") 

279 

280 if using_cap_token: 

281 assert cap_token is not None 

282 ok, err = _push_fact_with_cap_token(fact, fact_scope, cap_token) 

283 else: 

284 assert peer is not None and token_payload is not None 

285 ok, err = _push_fact_with_peer_token(fact, fact_scope, peer, token_payload) 

286 

287 if ok: 

288 accepted += 1 

289 else: 

290 rejected += 1 

291 if err is not None: 291 ↛ 277line 291 didn't jump to line 277 because the condition on line 291 was always true

292 errors.append(err) 

293 

294 return {"accepted": accepted, "rejected": rejected, "errors": errors} 

295 

296 

297def _push_fact_with_cap_token( 

298 fact: dict[str, Any], 

299 fact_scope: str, 

300 cap_token: dict[str, Any], 

301) -> tuple[bool, dict[str, Any] | None]: 

302 """Validate + ingest a single fact under capability-token auth. 

303 

304 Returns (ok, error_dict_or_None). 

305 """ 

306 # H-SEC-2: verify capability token object covers this fact's scope 

307 token_object = cap_token.get("object", "") 

308 if not _cap_token_covers_scope(token_object, fact_scope): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 return False, { 

310 "fact_id": fact.get("id"), 

311 "error": "insufficient_capability: token object does not cover scope", 

312 } 

313 

314 sender_node_id = cap_token.get("subject", "") 

315 fact_source = fact.get("source", "") 

316 # Source non-forgery: source must match capability token subject 

317 if fact_source != sender_node_id: 

318 return False, {"fact_id": fact.get("id"), "error": "source_not_owned"} 

319 

320 tenant = TenantContext( 

321 tenant_id="default", 

322 metadata={"tenant_context_source": "pinned"}, 

323 ) 

324 registry = get_registry() 

325 decision = registry.fire_voting( 

326 "federation_inbound_validate", 

327 fact=fact, 

328 fact_scope=fact_scope, 

329 cap_token=cap_token, 

330 tenant=tenant, 

331 ) 

332 if isinstance(decision, Deny): 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 return False, {"fact_id": fact.get("id"), "error": decision.reason} 

334 filtered_fact = registry.fire_filter_chain( 

335 "federation_inbound_filter", 

336 fact, 

337 fact_scope=fact_scope, 

338 cap_token=cap_token, 

339 tenant=tenant, 

340 ) 

341 

342 try: 

343 _public_module().ingest_fact( 

344 filtered_fact, 

345 sender_node_id, 

346 identity_strength_boost=0.5, # §19.4.2 boost for valid capability token 

347 ) 

348 return True, None 

349 except FederationHlcSkewError: 

350 return False, {"fact_id": fact.get("id"), "error": "hlc_skew"} 

351 except FederationIntegrityError as exc: 

352 return False, {"fact_id": fact.get("id"), "error": exc.reason} 

353 except Exception: 

354 return False, {"fact_id": fact.get("id"), "error": "ingest_error"} 

355 

356 

357def _push_fact_with_peer_token( 

358 fact: dict[str, Any], 

359 fact_scope: str, 

360 peer: dict[str, Any], 

361 token_payload: dict[str, Any], 

362) -> tuple[bool, dict[str, Any] | None]: 

363 """Validate + ingest a single fact under peer-JWT auth. 

364 

365 Returns (ok, error_dict_or_None). 

366 """ 

367 permitted = _allowed_output_scopes(peer, token_payload) 

368 

369 if fact_scope not in permitted: 

370 _public_module().write_audit_log( 

371 peer["id"], 

372 "scope_violation", 

373 {"fact_id": fact.get("id"), "scope": fact_scope}, 

374 ) 

375 return False, {"fact_id": fact.get("id"), "error": "scope_not_permitted"} 

376 

377 # Source non-forgery: source must match the sending peer's node_id (§6.4) 

378 fact_source = fact.get("source", "") 

379 if fact_source != peer["node_id"]: 

380 _public_module().write_audit_log( 

381 peer["id"], 

382 "rejected_fact", 

383 { 

384 "fact_id": fact.get("id"), 

385 "reason": "source_not_owned", 

386 "source": fact_source, 

387 "peer_node_id": peer["node_id"], 

388 }, 

389 ) 

390 return False, {"fact_id": fact.get("id"), "error": "source_not_owned"} 

391 

392 tenant = TenantContext( 

393 tenant_id="default", 

394 metadata={"tenant_context_source": "pinned"}, 

395 ) 

396 registry = get_registry() 

397 decision = registry.fire_voting( 

398 "federation_inbound_validate", 

399 fact=fact, 

400 fact_scope=fact_scope, 

401 peer=peer, 

402 token_payload=token_payload, 

403 tenant=tenant, 

404 ) 

405 if isinstance(decision, Deny): 

406 return False, {"fact_id": fact.get("id"), "error": decision.reason} 

407 filtered_fact = registry.fire_filter_chain( 

408 "federation_inbound_filter", 

409 fact, 

410 fact_scope=fact_scope, 

411 peer=peer, 

412 token_payload=token_payload, 

413 tenant=tenant, 

414 ) 

415 

416 try: 

417 _public_module().ingest_fact( 

418 filtered_fact, 

419 peer["node_id"], 

420 origin_allowed_scopes=json.loads(peer["allowed_scopes"]), 

421 ) 

422 return True, None 

423 except FederationHlcSkewError: 

424 return False, {"fact_id": fact.get("id"), "error": "hlc_skew"} 

425 except FederationIntegrityError as exc: 

426 return False, {"fact_id": fact.get("id"), "error": exc.reason} 

427 except Exception: 

428 return False, {"fact_id": fact.get("id"), "error": "ingest_error"}