Coverage for node / src / stigmem_node / routes / federation / replication.py: 86%
166 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation fact pull and push routes."""
3from __future__ import annotations
5import json
6from typing import Annotated, Any
8from fastapi import Header, HTTPException, Query, Request
10from ...db import db
11from ...federation.federation_ingest import FederationHlcSkewError, FederationIntegrityError
12from ...federation.tls import check_peer_san
13from ...identity.capability import CapabilityTokenError, verify_token
14from ...identity.trust_store import get_peer_manifest
15from ...metrics import FEDERATION_EGRESS
16from ...models.facts import row_to_record
17from ...models.federation import FederationFactsResponse
18from ...plugins import Deny, TenantContext, get_registry
19from .common import (
20 PeerTokenDep,
21 _allowed_output_scopes,
22 _cap_token_covers_scope,
23 _get_mtls_peer_cert,
24 _public_module,
25 _try_peer_token_auth,
26 logger,
27 router,
28)
31@router.get("/v1/federation/facts", response_model=FederationFactsResponse)
32def pull_facts(
33 peer_and_token: PeerTokenDep,
34 scope: str | None = Query(None),
35 cursor: str | None = Query(None),
36 limit: int = Query(100, ge=1, le=500),
37) -> FederationFactsResponse:
38 """Return scope-filtered, HLC-cursor-paged facts to an authenticated peer.
40 Covered by Spec-05-Federation-Trust.
41 """
42 peer, token_payload = peer_and_token
44 permitted = _allowed_output_scopes(peer, token_payload)
45 if not permitted: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 raise HTTPException(status_code=403, detail="no permitted scopes")
48 if scope is not None:
49 if scope not in permitted: 49 ↛ 56line 49 didn't jump to line 56 because the condition on line 49 was always true
50 _public_module().write_audit_log(
51 peer["id"],
52 "scope_violation",
53 {"requested_scope": scope, "permitted": list(permitted)},
54 )
55 raise HTTPException(status_code=403, detail="scope not permitted for this peer")
56 query_scopes = {scope}
57 else:
58 query_scopes = permitted
60 scope_placeholders = ",".join("?" * len(query_scopes))
61 params: list[Any] = list(query_scopes)
62 conditions: list[str] = [
63 f"scope IN ({scope_placeholders})",
64 "tenant_id = ?",
65 "hlc IS NOT NULL", # only facts with an HLC are replication-eligible
66 "received_from IS NULL", # do not re-federate inbound facts (§3.1)
67 "entity NOT LIKE 'stigmem:conflict:%'", # conflict entities are local (§6.5)
68 "relation NOT LIKE 'stigmem:%'", # meta-facts (received_from, ttl) are local
69 "re_federation_blocked = 0", # exclude company-scope relay-blocked facts (§6.8.2)
70 "(derived_from IS NULL OR derived_from = '' OR derived_from = '[]')",
71 ]
72 params.append("default")
73 if cursor:
74 conditions.append("hlc > ?")
75 params.append(cursor)
77 where = " AND ".join(conditions)
78 params.append(limit + 1)
80 with db() as conn:
81 rows = conn.execute(
82 f"SELECT * FROM facts WHERE {where} ORDER BY hlc ASC LIMIT ?", # noqa: S608 # nosec B608 — where built from literal fragments; values in params
83 params,
84 ).fetchall()
86 has_more = len(rows) > limit
87 rows = rows[:limit]
89 seen: dict[tuple[str, str, str], int] = {}
90 for r in rows:
91 k = (r["entity"], r["relation"], r["scope"])
92 seen[k] = seen.get(k, 0) + 1
94 records = [
95 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1)
96 for r in rows
97 ]
98 tenant = TenantContext(
99 tenant_id="default",
100 metadata={"tenant_context_source": "pinned"},
101 )
102 registry = get_registry()
103 records = registry.fire_filter_chain(
104 "federation_outbound_filter",
105 records,
106 peer=peer,
107 token_payload=token_payload,
108 tenant=tenant,
109 )
110 records = registry.fire_filter_chain(
111 "federation_outbound_sign",
112 records,
113 peer=peer,
114 token_payload=token_payload,
115 tenant=tenant,
116 )
118 new_cursor: str | None = rows[-1]["hlc"] if rows else cursor
119 FEDERATION_EGRESS.labels(peer_id=peer["node_id"], status="ok").inc(len(records))
120 return FederationFactsResponse(facts=records, cursor=new_cursor, has_more=has_more)
123# ---------------------------------------------------------------------------
124# POST /v1/federation/facts/push — optional push (§5.11)
125# ---------------------------------------------------------------------------
128def _verify_push_cap_token(x_stigmem_capability: str) -> dict[str, Any]:
129 """Verify a capability-token header for the push path (H-SEC-2).
131 On verification failure logs ``capability_rejected`` and raises 401.
132 On success returns the decoded token dict and logs ``capability_verified``.
133 """
134 try:
135 verify_token(
136 x_stigmem_capability,
137 lambda uri: get_peer_manifest(
138 uri, refresh_if_expired=True, trust_mode=_public_module().settings.trust_mode
139 ),
140 trust_mode=_public_module().settings.trust_mode,
141 )
142 except CapabilityTokenError as exc:
143 # M-SEC-4: log capability_rejected
144 import uuid as _uuid
145 from datetime import UTC as _UTC
146 from datetime import datetime as _datetime
148 _now = _datetime.now(_UTC).isoformat()
149 try:
150 import json as _json
152 with db() as conn:
153 conn.execute(
154 """INSERT INTO fact_audit_log
155 (id, fact_id, event_type, entity_uri, oidc_sub, source,
156 attested_key_id, detail, ts)
157 VALUES (?,?,?,?,?,?,?,?,?)""",
158 (
159 str(_uuid.uuid4()),
160 "capability:rejected",
161 "capability_rejected",
162 None,
163 None,
164 "system:capability",
165 None,
166 _json.dumps({"reason": str(exc)}),
167 _now,
168 ),
169 )
170 except Exception as audit_exc: # nosec B110 — audit log best-effort
171 logger.debug("capability_rejected audit log failed: %s", audit_exc)
172 raise HTTPException(status_code=401, detail=f"capability token invalid: {exc}") from exc
174 try:
175 cap_token: dict[str, Any] = json.loads(x_stigmem_capability)
176 except json.JSONDecodeError as exc:
177 raise HTTPException(
178 status_code=400, detail=f"malformed capability token JSON: {exc}"
179 ) from exc
181 if cap_token.get("verb") != "write":
182 raise HTTPException(
183 status_code=403,
184 detail="insufficient_capability: token verb must be 'write' for push",
185 )
187 # M-SEC-4: log capability_verified
188 import uuid as _uuid2
189 from datetime import UTC as _UTC2
190 from datetime import datetime as _datetime2
192 _now2 = _datetime2.now(_UTC2).isoformat()
193 try:
194 with db() as conn:
195 conn.execute(
196 """INSERT INTO fact_audit_log
197 (id, fact_id, event_type, entity_uri, oidc_sub, source,
198 attested_key_id, detail, ts)
199 VALUES (?,?,?,?,?,?,?,?,?)""",
200 (
201 str(_uuid2.uuid4()),
202 cap_token.get("token_id", "unknown"),
203 "capability_verified",
204 cap_token.get("subject"),
205 None,
206 "system:capability",
207 None,
208 json.dumps(
209 {
210 "token_id": cap_token.get("token_id"),
211 "issuer": cap_token.get("issuer"),
212 "verb": cap_token.get("verb"),
213 "object": cap_token.get("object"),
214 }
215 ),
216 _now2,
217 ),
218 )
219 except Exception as audit_exc: # nosec B110 — audit log best-effort
220 logger.debug("capability_verified audit log failed: %s", audit_exc)
222 return cap_token
225@router.post("/v1/federation/facts/push", status_code=202)
226def push_facts(
227 request: Request,
228 body: dict[str, Any],
229 authorization: Annotated[str | None, Header(alias="authorization")] = None,
230 x_stigmem_capability: Annotated[str | None, Header(alias="x-stigmem-capability")] = None,
231) -> dict[str, Any]:
232 """Receive push-replicated facts from a peer. Off by default.
234 Auth (H-SEC-2): peer JWT first; if that fails and X-Stigmem-Capability is
235 present, fall through to capability-token verification. Capability tokens
236 must carry verb=write and an object that covers all pushed fact scopes.
237 Covered by Spec-05-Federation-Trust.
238 """
239 if not _public_module().settings.federation_push_enabled:
240 raise HTTPException(status_code=405, detail="push replication not enabled on this node")
242 # --- Phase 1: try peer JWT auth ---
243 peer_auth = _try_peer_token_auth(authorization)
245 peer: dict[str, Any] | None = None
246 token_payload: dict[str, Any] | None = None
247 cap_token: dict[str, Any] | None = None
248 using_cap_token = False
250 if peer_auth is not None:
251 peer, token_payload = peer_auth
252 # §22.1.2.4 — enforce SAN on the push path too
253 if _public_module().settings.mtls_enabled: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 peer_cert = _get_mtls_peer_cert(request)
255 if not check_peer_san(peer_cert, peer["node_id"]):
256 _public_module().write_audit_log(
257 peer["id"], "san_mismatch", {"node_id": peer["node_id"]}
258 )
259 raise HTTPException(
260 status_code=401,
261 detail="peer certificate URI SAN does not match node_id",
262 )
263 elif x_stigmem_capability is not None:
264 cap_token = _verify_push_cap_token(x_stigmem_capability)
265 using_cap_token = True
266 else:
267 raise HTTPException(
268 status_code=401,
269 detail="peer token or X-Stigmem-Capability header required",
270 )
272 facts = body.get("facts", [])
273 accepted = 0
274 rejected = 0
275 errors: list[dict[str, Any]] = []
277 for fact in facts:
278 fact_scope = fact.get("scope", "")
280 if using_cap_token:
281 assert cap_token is not None
282 ok, err = _push_fact_with_cap_token(fact, fact_scope, cap_token)
283 else:
284 assert peer is not None and token_payload is not None
285 ok, err = _push_fact_with_peer_token(fact, fact_scope, peer, token_payload)
287 if ok:
288 accepted += 1
289 else:
290 rejected += 1
291 if err is not None: 291 ↛ 277line 291 didn't jump to line 277 because the condition on line 291 was always true
292 errors.append(err)
294 return {"accepted": accepted, "rejected": rejected, "errors": errors}
297def _push_fact_with_cap_token(
298 fact: dict[str, Any],
299 fact_scope: str,
300 cap_token: dict[str, Any],
301) -> tuple[bool, dict[str, Any] | None]:
302 """Validate + ingest a single fact under capability-token auth.
304 Returns (ok, error_dict_or_None).
305 """
306 # H-SEC-2: verify capability token object covers this fact's scope
307 token_object = cap_token.get("object", "")
308 if not _cap_token_covers_scope(token_object, fact_scope): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 return False, {
310 "fact_id": fact.get("id"),
311 "error": "insufficient_capability: token object does not cover scope",
312 }
314 sender_node_id = cap_token.get("subject", "")
315 fact_source = fact.get("source", "")
316 # Source non-forgery: source must match capability token subject
317 if fact_source != sender_node_id:
318 return False, {"fact_id": fact.get("id"), "error": "source_not_owned"}
320 tenant = TenantContext(
321 tenant_id="default",
322 metadata={"tenant_context_source": "pinned"},
323 )
324 registry = get_registry()
325 decision = registry.fire_voting(
326 "federation_inbound_validate",
327 fact=fact,
328 fact_scope=fact_scope,
329 cap_token=cap_token,
330 tenant=tenant,
331 )
332 if isinstance(decision, Deny): 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 return False, {"fact_id": fact.get("id"), "error": decision.reason}
334 filtered_fact = registry.fire_filter_chain(
335 "federation_inbound_filter",
336 fact,
337 fact_scope=fact_scope,
338 cap_token=cap_token,
339 tenant=tenant,
340 )
342 try:
343 _public_module().ingest_fact(
344 filtered_fact,
345 sender_node_id,
346 identity_strength_boost=0.5, # §19.4.2 boost for valid capability token
347 )
348 return True, None
349 except FederationHlcSkewError:
350 return False, {"fact_id": fact.get("id"), "error": "hlc_skew"}
351 except FederationIntegrityError as exc:
352 return False, {"fact_id": fact.get("id"), "error": exc.reason}
353 except Exception:
354 return False, {"fact_id": fact.get("id"), "error": "ingest_error"}
357def _push_fact_with_peer_token(
358 fact: dict[str, Any],
359 fact_scope: str,
360 peer: dict[str, Any],
361 token_payload: dict[str, Any],
362) -> tuple[bool, dict[str, Any] | None]:
363 """Validate + ingest a single fact under peer-JWT auth.
365 Returns (ok, error_dict_or_None).
366 """
367 permitted = _allowed_output_scopes(peer, token_payload)
369 if fact_scope not in permitted:
370 _public_module().write_audit_log(
371 peer["id"],
372 "scope_violation",
373 {"fact_id": fact.get("id"), "scope": fact_scope},
374 )
375 return False, {"fact_id": fact.get("id"), "error": "scope_not_permitted"}
377 # Source non-forgery: source must match the sending peer's node_id (§6.4)
378 fact_source = fact.get("source", "")
379 if fact_source != peer["node_id"]:
380 _public_module().write_audit_log(
381 peer["id"],
382 "rejected_fact",
383 {
384 "fact_id": fact.get("id"),
385 "reason": "source_not_owned",
386 "source": fact_source,
387 "peer_node_id": peer["node_id"],
388 },
389 )
390 return False, {"fact_id": fact.get("id"), "error": "source_not_owned"}
392 tenant = TenantContext(
393 tenant_id="default",
394 metadata={"tenant_context_source": "pinned"},
395 )
396 registry = get_registry()
397 decision = registry.fire_voting(
398 "federation_inbound_validate",
399 fact=fact,
400 fact_scope=fact_scope,
401 peer=peer,
402 token_payload=token_payload,
403 tenant=tenant,
404 )
405 if isinstance(decision, Deny):
406 return False, {"fact_id": fact.get("id"), "error": decision.reason}
407 filtered_fact = registry.fire_filter_chain(
408 "federation_inbound_filter",
409 fact,
410 fact_scope=fact_scope,
411 peer=peer,
412 token_payload=token_payload,
413 tenant=tenant,
414 )
416 try:
417 _public_module().ingest_fact(
418 filtered_fact,
419 peer["node_id"],
420 origin_allowed_scopes=json.loads(peer["allowed_scopes"]),
421 )
422 return True, None
423 except FederationHlcSkewError:
424 return False, {"fact_id": fact.get("id"), "error": "hlc_skew"}
425 except FederationIntegrityError as exc:
426 return False, {"fact_id": fact.get("id"), "error": exc.reason}
427 except Exception:
428 return False, {"fact_id": fact.get("id"), "error": "ingest_error"}