Coverage for node / src / stigmem_node / routes / federation / tombstones.py: 67%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation tombstone routes."""
3from __future__ import annotations
5from typing import Annotated, Any
7from fastapi import APIRouter, Header, HTTPException, Request, status
9from ...identity.capability import CapabilityTokenError, verify_token
10from ...identity.trust_store import get_peer_manifest
11from ...lifecycle.tombstones import list_revocations, list_tombstones
12from ...models.tombstones import FederationTombstonesResponse
13from .._federation_impl import federation_ingest_tombstone_impl
14from .common import _get_mtls_peer_cert, _public_module, _try_peer_token_auth
16router = APIRouter(tags=["federation"])
19@router.get("/v1/federation/tombstones", response_model=FederationTombstonesResponse)
20def federation_list_tombstones(
21 request: Request,
22 since: str | None = None,
23 limit: int = 200,
24 token_header: Annotated[str | None, Header(alias="Authorization")] = None,
25) -> FederationTombstonesResponse:
26 """Tombstone poll route.
28 Requires tombstone:read capability token. Covered by Spec-X2-RTBF-Tombstones.
29 """
30 raw_token = None
31 if token_header and token_header.startswith("Bearer "):
32 raw_token = token_header[7:]
34 if not raw_token:
35 raise HTTPException(
36 status_code=status.HTTP_401_UNAUTHORIZED,
37 detail="capability token required",
38 )
40 fed_settings = _public_module().settings
41 if fed_settings.trust_mode != "off": 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 try:
43 import json as _json
45 token_data = _json.loads(raw_token) if raw_token.startswith("{") else {}
46 verbs = token_data.get("verbs", token_data.get("verb", ""))
47 if isinstance(verbs, str):
48 verbs = [v.strip() for v in verbs.split(",")] if verbs else []
49 if "tombstone:read" not in verbs and "admin" not in verbs:
50 raise HTTPException(
51 status_code=status.HTTP_401_UNAUTHORIZED,
52 detail="tombstone:read capability required",
53 )
54 verify_token(
55 raw_token,
56 lambda uri: get_peer_manifest(
57 uri, refresh_if_expired=True, trust_mode=fed_settings.trust_mode
58 ),
59 trust_mode=fed_settings.trust_mode,
60 )
61 except CapabilityTokenError as exc:
62 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc
63 else:
64 import logging as _logging
66 _logging.getLogger("stigmem.federation").warning(
67 "tombstone poll: trust_mode=off — token signature verification skipped"
68 )
70 tombstone_list = list_tombstones(since=since)[:limit]
71 revocation_list = list_revocations(since=since)[:limit]
72 cursor = tombstone_list[-1].created_at if tombstone_list else None
73 return FederationTombstonesResponse(
74 tombstones=tombstone_list,
75 revocations=revocation_list,
76 cursor=cursor,
77 )
80@router.post("/v1/federation/tombstones/ingest", status_code=status.HTTP_200_OK)
81def federation_ingest_tombstone(
82 request: Request,
83 payload: dict[str, Any],
84 authorization: Annotated[str | None, Header(alias="Authorization")] = None,
85 x_stigmem_capability: Annotated[str | None, Header(alias="x-stigmem-capability")] = None,
86) -> dict[str, Any]:
87 """Inbound tombstone push from a federation peer.
89 Auth: peer JWT or capability token with tombstone:write verb (mirrors push_facts).
90 Verifies signature against org manifest, writes to local tombstones table.
91 Covered by Spec-X2-RTBF-Tombstones.
92 """
93 # Implementation lives in _federation_impl.federation_ingest_tombstone_impl.
94 return federation_ingest_tombstone_impl(
95 request,
96 payload,
97 authorization,
98 x_stigmem_capability,
99 _try_peer_token_auth,
100 _get_mtls_peer_cert,
101 )